activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r990107 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/network/ activemq-core/src/main/java/org/apache/activemq/transport/ activemq-core/src/main/java/org/apache/activemq/transport/tcp/ activemq-optional/src/main/java...
Date Fri, 27 Aug 2010 11:34:04 GMT
Author: gtully
Date: Fri Aug 27 11:34:04 2010
New Revision: 990107

URL: http://svn.apache.org/viewvc?rev=990107&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2764 - add inactivity monitor to http
transport, enabled by default with 30 second default idle timeout, configure via transport
url params as there is no negeotion via wire format info over http. Now a duplex return connection
can expire when there is no inactivity. Also processed shutdown so in the normal case a duplex
connector can be disposed when the remote broker stops, the inactivity monitor deal with the
abortive closure case

Added:
    activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/bugs/
    activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/bugs/AMQ2764Test.java
  (with props)
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/reconnect-broker1.xml
  (with props)
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/reconnect-broker2.xml
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpEmbeddedTunnelServlet.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/https/HttpsTransportFactory.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/https/HttpsTransportServer.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=990107&r1=990106&r2=990107&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Fri Aug 27 11:34:04 2010
@@ -91,7 +91,7 @@ import org.apache.commons.logging.LogFac
  * @version $Revision$
  */
 public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware
{
-    private static final Log LOG = LogFactory.getLog(DemandForwardingBridge.class);
+    private static final Log LOG = LogFactory.getLog(DemandForwardingBridgeSupport.class);
     private static final ThreadPoolExecutor ASYNC_TASKS;
     protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
     protected final Transport localBroker;
@@ -527,6 +527,12 @@ public abstract class DemandForwardingBr
                                     LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
                                 }
                                 break;
+                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
+                                // initiator is shutting down, controlled case
+                                // abortive close dealt with by inactivity monitor
+                                LOG.info("Stopping network bridge on shutdown of remote broker");
+                                serviceRemoteException(new IOException(command.toString()));
+                                break;
                             default:
                                 if (LOG.isDebugEnabled()) {
                                     LOG.debug("Ignoring remote command: " + command);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=990107&r1=990106&r2=990107&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Fri Aug 27 11:34:04 2010
@@ -44,6 +44,7 @@ public class InactivityMonitor extends T
     private static final ThreadPoolExecutor ASYNC_TASKS;
 
     private static int CHECKER_COUNTER;
+    private static long DEFAULT_CHECK_TIME_MILLS = 30000;
     private static Timer  READ_CHECK_TIMER;
     private static Timer  WRITE_CHECK_TIMER;
 
@@ -63,9 +64,11 @@ public class InactivityMonitor extends T
     private SchedulerTimerTask readCheckerTask;
 
     private boolean ignoreRemoteWireFormat = false;
-    private long readCheckTime;
-    private long writeCheckTime;
-    private long initialDelayTime;
+    private boolean ignoreAllWireFormatInfo = false;
+
+    private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
+    private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
+    private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
     private boolean useKeepAlive = true;
     private boolean keepAliveResponseRequired;
     private WireFormat wireFormat;
@@ -115,6 +118,14 @@ public class InactivityMonitor extends T
     public InactivityMonitor(Transport next, WireFormat wireFormat) {
         super(next);
         this.wireFormat = wireFormat;
+        if (this.wireFormat == null) {
+            this.ignoreAllWireFormatInfo = true;
+        }
+    }
+
+    public void start() throws Exception {
+        next.start();
+        startMonitorThreads();
     }
 
     public void stop() throws Exception {
@@ -268,23 +279,29 @@ public class InactivityMonitor extends T
         ignoreRemoteWireFormat = val;
     }
 
+    public long getReadCheckTime() {
+        return readCheckTime;
+    }
+
+    public void setReadCheckTime(long readCheckTime) {
+        this.readCheckTime = readCheckTime;
+    }
+
+    public long getInitialDelayTime() {
+        return initialDelayTime;
+    }
+
+    public void setInitialDelayTime(long initialDelayTime) {
+        this.initialDelayTime = initialDelayTime;
+    }
+    
     private synchronized void startMonitorThreads() throws IOException {
         if (monitorStarted.get()) {
             return;
         }
-        if (localWireFormatInfo == null) {
-            return;
-        }
-        if (remoteWireFormatInfo == null) {
-            return;
-        }
 
-        if (!ignoreRemoteWireFormat) {
-            readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
-            initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(),
remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
-        } else {
-            readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
-            initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay();
+        if (!configuredOk()) {
+            return;
         }
 
         if (readCheckTime > 0) {
@@ -304,6 +321,23 @@ public class InactivityMonitor extends T
         }
     }
 
+    private boolean configuredOk() throws IOException {
+        boolean configured = false;
+        if (ignoreAllWireFormatInfo) {
+            configured = true;
+        } else if (localWireFormatInfo != null && remoteWireFormatInfo != null) {
+            if (!ignoreRemoteWireFormat) {
+                readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(),
remoteWireFormatInfo.getMaxInactivityDuration());
+                initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(),
remoteWireFormatInfo.getMaxInactivityDurationInitalDelay());
+            } else {
+                readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
+                initialDelayTime = localWireFormatInfo.getMaxInactivityDurationInitalDelay();
+            }
+            configured = true;
+        }
+        return configured;
+    }
+
     /**
      *
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=990107&r1=990106&r2=990107&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
Fri Aug 27 11:34:04 2010
@@ -294,4 +294,11 @@ public abstract class TransportFactory {
         return transport;
     }
 
+    protected String getOption(Map options, String key, String def) {
+        String rc = (String) options.remove(key);
+        if( rc == null ) {
+            rc = def;
+        }
+        return rc;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java?rev=990107&r1=990106&r2=990107&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java
Fri Aug 27 11:34:04 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport;
 
 import java.net.URI;
+import java.util.Map;
 
 import org.apache.activemq.util.ServiceSupport;
 
@@ -30,6 +31,7 @@ public abstract class TransportServerSup
     private URI connectURI;
     private URI bindLocation;
     private TransportAcceptListener acceptListener;
+    protected Map<String, Object> transportOptions;
 
     public TransportServerSupport() {
     }
@@ -82,4 +84,8 @@ public abstract class TransportServerSup
     public void setBindLocation(URI bindLocation) {
         this.bindLocation = bindLocation;
     }
+
+    public void setTransportOption(Map<String, Object> transportOptions) {
+        this.transportOptions = transportOptions;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=990107&r1=990106&r2=990107&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
Fri Aug 27 11:34:04 2010
@@ -111,14 +111,6 @@ public class TcpTransportFactory extends
         return super.compositeConfigure(transport, format, options);
     }
 
-    private String getOption(Map options, String key, String def) {
-        String rc = (String) options.remove(key);
-        if( rc == null ) {
-            rc = def;
-        }
-        return rc;
-    }
-
     /**
      * Returns true if the inactivity monitor should be used on the transport
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=990107&r1=990106&r2=990107&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
Fri Aug 27 11:34:04 2010
@@ -106,7 +106,6 @@ public class TcpTransportServer extends 
      * This parameter is most probably set in Connection or TransportConnector URIs.
      */
     protected boolean startLogging = true;
-    protected Map<String, Object> transportOptions;
     protected final ServerSocketFactory serverSocketFactory;
     protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
     protected Thread socketHandlerThread;
@@ -387,10 +386,6 @@ public class TcpTransportServer extends 
         return (InetSocketAddress)serverSocket.getLocalSocketAddress();
     }
 
-    public void setTransportOption(Map<String, Object> transportOptions) {
-        this.transportOptions = transportOptions;
-    }
-    
     protected final void handleSocket(Socket socket) {
         try {
             if (this.currentTransportCount >= this.maximumConnections) {

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpEmbeddedTunnelServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpEmbeddedTunnelServlet.java?rev=990107&r1=990106&r2=990107&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpEmbeddedTunnelServlet.java
(original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpEmbeddedTunnelServlet.java
Fri Aug 27 11:34:04 2010
@@ -43,7 +43,8 @@ public class HttpEmbeddedTunnelServlet e
 
                 // Add the servlet connector
                 String url = getConnectorURL();
-                transportConnector = new HttpTransportServer(new URI(url));
+                HttpTransportFactory factory = new HttpTransportFactory();
+                transportConnector = (HttpTransportServer) factory.doBind(new URI(url));
                 broker.addConnector(transportConnector);
 
                 String brokerURL = getServletContext().getInitParameter("org.apache.activemq.brokerURL");

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java?rev=990107&r1=990106&r2=990107&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java
(original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java
Fri Aug 27 11:34:04 2010
@@ -18,14 +18,22 @@ package org.apache.activemq.transport.ht
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.activemq.transport.InactivityMonitor;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.ThreadNameFilter;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportLoggerFactory;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.util.TextWireFormat;
 import org.apache.activemq.transport.xstream.XStreamWireFormat;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,7 +47,15 @@ public class HttpTransportFactory extend
     private static final Log LOG = LogFactory.getLog(HttpTransportFactory.class);
 
     public TransportServer doBind(URI location) throws IOException {
-        return new HttpTransportServer(location);
+        try {
+            Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
+            HttpTransportServer result = new HttpTransportServer(location, this);
+            Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options,
"transport.");
+            result.setTransportOption(transportOptions);
+            return result;
+        } catch (URISyntaxException e) {
+            throw IOExceptionSupport.create(e);
+        }
     }
 
     protected TextWireFormat asTextWireFormat(WireFormat wireFormat) {
@@ -59,16 +75,26 @@ public class HttpTransportFactory extend
         return new HttpClientTransport(textWireFormat, location);
     }
 
+    public Transport serverConfigure(Transport transport, WireFormat format, HashMap options)
throws Exception {
+        return compositeConfigure(transport, format, options);
+    }
+
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
{
-        HttpClientTransport httpTransport = (HttpClientTransport) super.compositeConfigure(transport,
format, options);
-        transport = httpTransport;
-        if( httpTransport.isTrace() ) {
+        transport = super.compositeConfigure(transport, format, options);
+        HttpClientTransport httpTransport = (HttpClientTransport)transport.narrow(HttpClientTransport.class);
+        if(httpTransport != null && httpTransport.isTrace() ) {
             try {
                 transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
             } catch (Throwable e) {
                 LOG.error("Could not create TransportLogger object for: " + TransportLoggerFactory.defaultLogWriterName
+ ", reason: " + e, e);
             }
         }
+        boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor",
"true"));
+        if (useInactivityMonitor) {
+            transport = new InactivityMonitor(transport, null /* ignore wire format as no
negotiation over http */);
+            IntrospectionSupport.setProperties(transport, options);
+        }
+
         return transport;
     }
 

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java?rev=990107&r1=990106&r2=990107&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
(original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java
Fri Aug 27 11:34:04 2010
@@ -41,10 +41,12 @@ public class HttpTransportServer extends
     private TextWireFormat wireFormat;
     private Server server;
     private Connector connector;
+    private HttpTransportFactory transportFactory;
 
-    public HttpTransportServer(URI uri) {
+    public HttpTransportServer(URI uri, HttpTransportFactory factory) {
         super(uri);
         this.bindAddress = uri;
+        this.transportFactory = factory;
     }
 
     public void setBrokerInfo(BrokerInfo brokerInfo) {
@@ -112,6 +114,8 @@ public class HttpTransportServer extends
 
         contextHandler.setAttribute("acceptListener", getAcceptListener());
         contextHandler.setAttribute("wireFormat", getWireFormat());
+        contextHandler.setAttribute("transportFactory", transportFactory);
+        contextHandler.setAttribute("transportOptions", transportOptions);
         server.start();
     }
 

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java?rev=990107&r1=990106&r2=990107&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
(original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
Fri Aug 27 11:34:04 2010
@@ -31,9 +31,12 @@ import javax.servlet.http.HttpServletReq
 import javax.servlet.http.HttpServletResponse;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.transport.InactivityMonitor;
+import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
 import org.apache.activemq.transport.util.TextWireFormat;
 import org.apache.activemq.transport.xstream.XStreamWireFormat;
+import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -49,9 +52,11 @@ public class HttpTunnelServlet extends H
     private static final Log LOG = LogFactory.getLog(HttpTunnelServlet.class);
 
     private TransportAcceptListener listener;
+    private HttpTransportFactory transportFactory;
     private TextWireFormat wireFormat;
     private final Map<String, BlockingQueueTransport> clients = new HashMap<String,
BlockingQueueTransport>();
     private final long requestTimeout = 30000L;
+    private HashMap transportOptions;
 
     @Override
     public void init() throws ServletException {
@@ -60,6 +65,11 @@ public class HttpTunnelServlet extends H
         if (listener == null) {
             throw new ServletException("No such attribute 'acceptListener' available in the
ServletContext");
         }
+        transportFactory = (HttpTransportFactory)getServletContext().getAttribute("transportFactory");
+        if (transportFactory == null) {
+            throw new ServletException("No such attribute 'transportFactory' available in
the ServletContext");    
+        }
+        transportOptions = (HashMap)getServletContext().getAttribute("transportOptions");
         wireFormat = (TextWireFormat)getServletContext().getAttribute("wireFormat");
         if (wireFormat == null) {
             wireFormat = createWireFormat();
@@ -174,14 +184,20 @@ public class HttpTunnelServlet extends H
         synchronized (this) {
             BlockingQueueTransport answer = clients.get(clientID);
             if (answer != null) {
-                response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID
'" + clientID + "' has allready been established");
-                LOG.warn("A session for clientID '" + clientID + "' has allready been established");
+                response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID
'" + clientID + "' has already been established");
+                LOG.warn("A session for clientID '" + clientID + "' has already been established");
                 return null;
             }
 
             answer = createTransportChannel();
             clients.put(clientID, answer);
-            listener.onAccept(answer);
+            Transport transport = answer;
+            try {
+                transport = transportFactory.serverConfigure(answer, null, transportOptions);
+            } catch (Exception e) {
+                IOExceptionSupport.create(e);
+            }
+            listener.onAccept(transport);
             //wait for the transport to connect
             while (!answer.isConnected()) {
             	try {

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/https/HttpsTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/https/HttpsTransportFactory.java?rev=990107&r1=990106&r2=990107&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/https/HttpsTransportFactory.java
(original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/https/HttpsTransportFactory.java
Fri Aug 27 11:34:04 2010
@@ -33,11 +33,11 @@ import org.apache.activemq.wireformat.Wi
 public class HttpsTransportFactory extends HttpTransportFactory {
     
     public TransportServer doBind(String brokerId, URI location) throws IOException {
-        return new HttpsTransportServer(location);
+        return new HttpsTransportServer(location, this);
     }
     
     public TransportServer doBind(URI location) throws IOException {
-        return new HttpsTransportServer(location);
+        return new HttpsTransportServer(location, this);
     }
 
     protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException
{

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/https/HttpsTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/https/HttpsTransportServer.java?rev=990107&r1=990106&r2=990107&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/https/HttpsTransportServer.java
(original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/https/HttpsTransportServer.java
Fri Aug 27 11:34:04 2010
@@ -32,8 +32,8 @@ public class HttpsTransportServer extend
     private String keyCertificateAlgorithm;
     private String protocol;
 
-    public HttpsTransportServer(URI uri) {
-        super(uri);
+    public HttpsTransportServer(URI uri, HttpsTransportFactory factory) {
+        super(uri, factory);
     }
 
     public void doStart() throws Exception {

Added: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/bugs/AMQ2764Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/bugs/AMQ2764Test.java?rev=990107&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/bugs/AMQ2764Test.java
(added)
+++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/bugs/AMQ2764Test.java
Fri Aug 27 11:34:04 2010
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.ConsumerEvent;
+import org.apache.activemq.advisory.ConsumerEventSource;
+import org.apache.activemq.advisory.ConsumerListener;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class AMQ2764Test extends TestCase {
+
+    private static final Log LOG = LogFactory.getLog(AMQ2764Test.class);
+
+    private BrokerService brokerOne;
+    private BrokerService brokerTwo;
+    private Destination destination;
+    private ArrayList<Connection> connections = new ArrayList<Connection>();
+
+    public void testBrokerRestart() throws Exception {
+
+        startBrokerOne();
+        Thread.sleep(5000);
+        startBrokerTwo();
+        Thread.sleep(5000);
+
+        ActiveMQConnectionFactory producerConnectionFactory = createBrokerOneConnectionFactory();
+        ActiveMQConnectionFactory secondProducerConnectionFactory = createBrokerTwoConnectionFactory();
+        ActiveMQConnectionFactory consumerConnectionFactory = createBrokerOneConnectionFactory();
+
+        MessageConsumer consumer = createConsumer(consumerConnectionFactory);
+        AtomicInteger counter = createConsumerCounter(consumerConnectionFactory);
+        waitForConsumerToArrive(counter);
+
+        final int expectedMessagesReceived = 25;
+        int actualMessagesReceived = doSendMessage(expectedMessagesReceived, consumer, producerConnectionFactory);
+        assertEquals("Didn't receive the right amount of messages directly connected", expectedMessagesReceived,
actualMessagesReceived);
+        assertNull( "Had extra messages", consumer.receiveNoWait());
+
+        actualMessagesReceived = doSendMessage(expectedMessagesReceived, consumer, secondProducerConnectionFactory);
+        assertEquals("Didn't receive the right amount of messages via network", expectedMessagesReceived,
actualMessagesReceived);
+        assertNull( "Had extra messages", consumer.receiveNoWait());
+
+        LOG.info("Stopping broker one");
+        stopBrokerOne();
+
+        TimeUnit.SECONDS.sleep(1);
+        LOG.info("Restarting broker");
+        startBrokerOne();
+
+        consumer = createConsumer(consumerConnectionFactory);
+        counter = createConsumerCounter(consumerConnectionFactory);
+        waitForConsumerToArrive(counter);
+
+        actualMessagesReceived = doSendMessage(expectedMessagesReceived, consumer, secondProducerConnectionFactory);
+        assertEquals("Didn't receive the right amount of messages via network after restart",
expectedMessagesReceived, actualMessagesReceived);
+        assertNull( "Had extra messages", consumer.receiveNoWait());
+
+        stopBrokerOne();
+        stopBrokerTwo();
+    }
+
+    protected int doSendMessage(int expectedMessagesReceived, MessageConsumer consumer, ActiveMQConnectionFactory
connectionFactory) throws Exception {
+        int messagesReceived = 0;
+        for (int i=0; i<expectedMessagesReceived; i++) {
+            String messageId = sendMessage(connectionFactory);
+            Message message = consumer.receive(5000);
+            if ( message!=null ) {
+                messagesReceived++;
+            }
+        }
+        return messagesReceived;
+    }
+
+    protected String sendMessage(ActiveMQConnectionFactory connectionFactory) throws JMSException
{
+        Connection connection = null;
+        try {
+            connection = connectionFactory.createConnection();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(destination);
+            Message message = session.createMessage();
+            producer.send(message);
+            return message.getJMSMessageID();
+        } finally {
+            try {
+                connection.close();
+            } catch (Throwable ignore) {
+            }
+        }
+    }
+
+    protected BrokerService createFirstBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/bugs/amq2764/reconnect-broker1.xml"));
+    }
+
+    protected BrokerService createSecondBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/bugs/amq2764/reconnect-broker2.xml"));
+    }
+    
+    protected ActiveMQConnectionFactory createBrokerOneConnectionFactory() {
+        return new ActiveMQConnectionFactory("vm://broker1");
+    }
+
+    protected ActiveMQConnectionFactory createBrokerTwoConnectionFactory() {
+        return new ActiveMQConnectionFactory("vm://broker2");
+    }
+
+    protected void setUp() throws Exception {
+
+        LOG.info("===============================================================================");
+        LOG.info("Running Test Case: " + getName());
+        LOG.info("===============================================================================");
+
+        destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE");
+
+    }
+
+    protected void tearDown() throws Exception {
+        disposeConsumerConnections();
+        try {
+            stopBrokerOne();
+        } catch (Throwable e) {
+        }
+        try {
+            stopBrokerTwo();
+        } catch (Throwable e) {
+        }
+    }
+
+    protected void disposeConsumerConnections() {
+        for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
+            Connection connection = iter.next();
+            try {
+                connection.close();
+            } catch (Throwable ignore) {
+            }
+        }
+    }
+
+    protected void startBrokerOne() throws Exception {
+        if (brokerOne == null) {
+            brokerOne = createFirstBroker();
+            brokerOne.start();
+        }
+    }
+
+    protected void stopBrokerOne() throws Exception {
+        if (brokerOne != null) {
+            brokerOne.stop();
+            brokerOne = null;
+        }
+    }
+
+    protected void startBrokerTwo() throws Exception {
+        if (brokerTwo == null) {
+            brokerTwo = createSecondBroker();
+            brokerTwo.start();
+        }
+    }
+
+    protected void stopBrokerTwo() throws Exception {
+        if (brokerTwo != null) {
+            brokerTwo.stop();
+            brokerTwo = null;
+        }
+    }
+
+    protected MessageConsumer createConsumer(ActiveMQConnectionFactory consumerConnectionFactory)
throws JMSException {
+        Connection connection = consumerConnectionFactory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        return session.createConsumer(destination);
+    }
+
+    protected AtomicInteger createConsumerCounter(ActiveMQConnectionFactory cf) throws Exception
{
+        final AtomicInteger rc = new AtomicInteger(0);
+        Connection connection = cf.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        ConsumerEventSource source = new ConsumerEventSource(connection, destination);
+        source.setConsumerListener(new ConsumerListener() {
+            public void onConsumerEvent(ConsumerEvent event) {
+                rc.set(event.getConsumerCount());
+            }
+        });
+        source.start();
+
+        return rc;
+    }
+
+    protected void waitForConsumerToArrive(AtomicInteger consumerCounter) throws InterruptedException
{
+        for (int i = 0; i < 100; i++) {
+            if (consumerCounter.get() > 0) {
+                return;
+            }
+            Thread.sleep(100);
+        }
+        fail("The consumer did not arrive.");
+    }
+
+    protected void waitForConsumerToLeave(AtomicInteger consumerCounter) throws InterruptedException
{
+        for (int i = 0; i < 100; i++) {
+            if (consumerCounter.get() == 0) {
+                return;
+            }
+            Thread.sleep(100);
+        }
+        fail("The consumer did not leave.");
+    }
+
+}
+

Propchange: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/bugs/AMQ2764Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/bugs/AMQ2764Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/reconnect-broker1.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/reconnect-broker1.xml?rev=990107&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/reconnect-broker1.xml
(added)
+++ activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/reconnect-broker1.xml
Fri Aug 27 11:34:04 2010
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
+  http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker brokerName="broker1" persistent="false" useShutdownHook="false" useJmx="false"
xmlns="http://activemq.apache.org/schema/core">
+
+    <destinations>
+      <queue physicalName="RECONNECT.TEST.QUEUE"/>
+    </destinations>
+
+    <networkConnectors>
+      <networkConnector uri="static:(http://localhost:61617)" duplex="true">
+        <staticallyIncludedDestinations>
+          <queue physicalName="RECONNECT.TEST.QUEUE"/>
+        </staticallyIncludedDestinations>
+      </networkConnector>
+    </networkConnectors>
+    
+    <transportConnectors>
+      <transportConnector uri="http://localhost:61616"/>
+    </transportConnectors>
+
+  </broker>
+  
+</beans>
+

Propchange: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/reconnect-broker1.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/reconnect-broker1.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/reconnect-broker1.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Added: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/reconnect-broker2.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/reconnect-broker2.xml?rev=990107&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/reconnect-broker2.xml
(added)
+++ activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/reconnect-broker2.xml
Fri Aug 27 11:34:04 2010
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
+  http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker brokerName="broker2" persistent="false" useShutdownHook="false" useJmx="false"
xmlns="http://activemq.apache.org/schema/core">
+
+    <transportConnectors>
+      <!-- configure a low inactivity monitor check time to ensure all messages are received
quickly -->
+      <transportConnector uri="http://localhost:61617?transport.readCheckTime=4000&amp;transport.initialDelayTime=4000"/>
+    </transportConnectors>
+
+  </broker>
+
+
+</beans>
+

Propchange: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/reconnect-broker2.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/reconnect-broker2.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/bugs/amq2764/reconnect-broker2.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml



Mime
View raw message