activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1022071 - in /activemq/trunk: activemq-web-demo/src/main/webapp/ activemq-web-demo/src/main/webapp/js/ activemq-web-demo/src/main/webapp/test/ activemq-web/src/main/java/org/apache/activemq/web/
Date Wed, 13 Oct 2010 11:41:16 GMT
Author: dejanb
Date: Wed Oct 13 11:41:16 2010
New Revision: 1022071

URL: http://svn.apache.org/viewvc?rev=1022071&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2948 - ajax support for multiple clients in
the same session

Added:
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxWebClient.java
Modified:
    activemq/trunk/activemq-web-demo/src/main/webapp/chat.html
    activemq/trunk/activemq-web-demo/src/main/webapp/js/amq.js
    activemq/trunk/activemq-web-demo/src/main/webapp/test/amq_test.html
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java

Modified: activemq/trunk/activemq-web-demo/src/main/webapp/chat.html
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/main/webapp/chat.html?rev=1022071&r1=1022070&r2=1022071&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/main/webapp/chat.html (original)
+++ activemq/trunk/activemq-web-demo/src/main/webapp/chat.html Wed Oct 13 11:41:16 2010
@@ -55,7 +55,7 @@
 		//     org.activemq.Chat.init();
 		// }
 		window.onload = function() {
-			org.activemq.Amq.init({ uri: 'amq', logging: true, timeout: 45 });
+			org.activemq.Amq.init({ uri: 'amq', logging: true, timeout: 45, clientId:(new Date()).getTime().toString()
});
 			org.activemq.Chat.init();
 		};
 	</script>    

Modified: activemq/trunk/activemq-web-demo/src/main/webapp/js/amq.js
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/main/webapp/js/amq.js?rev=1022071&r1=1022070&r2=1022071&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/main/webapp/js/amq.js (original)
+++ activemq/trunk/activemq-web-demo/src/main/webapp/js/amq.js Wed Oct 13 11:41:16 2010
@@ -70,6 +70,11 @@ org.activemq.Amq = function() {
 	// message, messageType }.
 	var messageQueue = [];
 
+  // String to distinguish this client from others sharing the same session.
+  // This can occur when multiple browser windows or tabs using amq.js simultaneously.
+  // All windows share the same JESSIONID, but need to consume messages independently.
+  var clientId = null;
+  
 	/**
 	 * Iterate over the returned XML and for each message in the response, 
 	 * invoke the handler with the matching id.
@@ -138,9 +143,8 @@ org.activemq.Amq = function() {
 		var data = 'timeout=' + timeout * 1000
 				 + '&d=' + now.getTime()
 				 + '&r=' + Math.random();
-				 
 		var options = { method: 'get',
-			data: data,
+			data: addClientId( data ),
 			success: pollHandler,
 			error: pollErrorHandler};
 		adapter.ajax(uri, options);
@@ -158,7 +162,7 @@ org.activemq.Amq = function() {
 		} else {
 			org.activemq.Amq.startBatch();
 			adapter.ajax(uri, { method: 'post',
-				data: buildParams( [message] ),
+				data: addClientId( buildParams( [message] ) ),
 				error: errorHandler,
 				headers: headers,
 				success: org.activemq.Amq.endBatch});
@@ -181,18 +185,33 @@ org.activemq.Amq = function() {
 		}
 		return s.join('');
 	}
+	
+	// add clientId to data if it exists, before passing data to ajax connection adapter.
+	var addClientId = function( data ) {
+		var output = data || '';
+		if( clientId ) {
+			if( output.length > 0 ) {
+				output += '&';
+			}
+			output += 'clientId='+clientId;
+		}
+		return output;
+	}
 
 	return {
+		// optional clientId can be supplied to allow multiple clients (browser windows) within
the same session.
 		init : function(options) {
 			connectStatusHandler = options.connectStatusHandler || function(connected){};
 			uri = options.uri || '/amq';
 			pollDelay = typeof options.pollDelay == 'number' ? options.pollDelay : 0;
 			timeout = typeof options.timeout == 'number' ? options.timeout : 25;
 			logging = options.logging;
+			clientId = options.clientId;
 			adapter.init(options);
 			sendPoll();
+			
 		},
-
+		    
 		startBatch : function() {
 			batchInProgress = true;
 		},
@@ -205,7 +224,7 @@ org.activemq.Amq = function() {
 				
 				// we need to ensure that messages which set headers are sent by themselves.
 				// if 2 'listen' messages were sent together, and a 'selector' header were added to one
of them,
-				//	 AMQ would add the selector to both 'listen' commands.
+				// AMQ would add the selector to both 'listen' commands.
 				for(i=0;i<messageQueue.length;i++) {
 					// a message with headers should always be sent by itself.	if other messages have been
added, send this one later.
 					if ( messageQueue[ i ].headers && messagesToSend.length == 0 ) {
@@ -223,7 +242,7 @@ org.activemq.Amq = function() {
 				adapter.ajax(uri, {
 					method: 'post',
 					headers: outgoingHeaders,
-					data: body,
+					data: addClientId( body ),
 					success: org.activemq.Amq.endBatch, 
 					error: errorHandler});
 			} else {

Modified: activemq/trunk/activemq-web-demo/src/main/webapp/test/amq_test.html
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/main/webapp/test/amq_test.html?rev=1022071&r1=1022070&r2=1022071&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/main/webapp/test/amq_test.html (original)
+++ activemq/trunk/activemq-web-demo/src/main/webapp/test/amq_test.html Wed Oct 13 11:41:16
2010
@@ -282,6 +282,26 @@
       org.activemq.Amq.testPollHandler( response );
       
       assertEqual( 'test message', callbackValue.textContent );
+    }},
+    
+    testClientIdSpecifiedInInitIsAddedToAllAjaxRequests: function() { with( this ) {
+      // need to reset to remove the poll message sent when init() is called in setup().
+      org.activemq.AmqAdapter.reset();
+      org.activemq.Amq.init({ uri: '../amq', timeout: 30, clientId:'uniqueClientName' });
+      
+      org.activemq.Amq.addListener( 'id', 'queue://test', function(){} );
+      org.activemq.Amq.sendMessage( 'queue://test', '<message>test</message>'
);
+      org.activemq.Amq.removeListener( 'id', 'topic://test' );      
+      org.activemq.Amq.endBatch();
+      
+      var requests = org.activemq.AmqAdapter.getRequests();
+      var clientNameRegex = /clientId=uniqueClientName/;
+      
+      assertEqual( 3, requests.length );
+      assertMatch( clientNameRegex, requests[0].options.data );
+      assertMatch( clientNameRegex, requests[1].options.data );
+      assertMatch( clientNameRegex, requests[2].options.data );
+      
     }}
     
   }); 

Added: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java?rev=1022071&view=auto
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java (added)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java Wed
Oct 13 11:41:16 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.web;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+
+import org.eclipse.jetty.continuation.Continuation;
+import org.eclipse.jetty.continuation.ContinuationSupport;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.activemq.MessageAvailableListener;
+
+/*
+ * Listen for available messages and wakeup any continuations.
+ */
+public class AjaxListener implements MessageAvailableListener {
+    private static final Log LOG = LogFactory.getLog(AjaxListener.class);
+    
+    private long maximumReadTimeout;
+    private AjaxWebClient client;
+    private long lastAccess;
+    private Continuation continuation;
+
+    AjaxListener(AjaxWebClient client, long maximumReadTimeout) {
+        this.client = client;
+        this.maximumReadTimeout = maximumReadTimeout;
+    }
+
+    public void access() {
+        lastAccess = System.currentTimeMillis();
+    }
+
+    public synchronized void setContinuation(Continuation continuation) {
+        this.continuation = continuation;
+    }
+
+    public synchronized void onMessageAvailable(MessageConsumer consumer) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("message for " + consumer + "continuation=" + continuation);
+        }
+        if (continuation != null) {
+            try {
+                Message message = consumer.receive(10);
+                continuation.setAttribute("message", message);
+                continuation.setAttribute("consumer", consumer);
+            } catch (Exception e) {
+                LOG.error("Error receiving message " + e, e);
+            }
+            continuation.resume();
+        } else if (System.currentTimeMillis() - lastAccess > 2 * this.maximumReadTimeout)
{
+            new Thread() {
+                public void run() {
+                    client.closeConsumers();
+                };
+            }.start();
+        }
+    }
+}

Added: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxWebClient.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxWebClient.java?rev=1022071&view=auto
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxWebClient.java (added)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxWebClient.java Wed
Oct 13 11:41:16 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.web;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Date;
+
+import javax.jms.MessageConsumer;
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.activemq.MessageAvailableConsumer;
+
+/*
+ * Collection of all data needed to fulfill requests from a single web client.
+ */
+public class AjaxWebClient extends WebClient {
+    private static final Log LOG = LogFactory.getLog(AjaxWebClient.class);
+    
+    // an instance which has not been accessed in this many milliseconds can be removed.
+    final long expireAfter = 60 * 1000;
+    
+    Map<MessageAvailableConsumer, String> idMap;
+    Map<MessageAvailableConsumer, String> destinationNameMap;
+    AjaxListener listener;
+    Long lastAccessed;
+    
+    public AjaxWebClient( HttpServletRequest request, long maximumReadTimeout ) {
+        // 'id' meaning the first argument to the JavaScript addListener() function.
+        // used to indicate which JS callback should handle a given message.
+        this.idMap = new HashMap<MessageAvailableConsumer, String>();
+        
+        // map consumers to destinations like topic://test, etc.
+        this.destinationNameMap = new HashMap<MessageAvailableConsumer, String>();
+        
+        this.listener = new AjaxListener( this, maximumReadTimeout );
+        
+        this.lastAccessed = this.getNow();
+    }
+    
+    public Map<MessageAvailableConsumer, String> getIdMap() {
+        return this.idMap;
+    }
+    
+    public Map<MessageAvailableConsumer, String> getDestinationNameMap() {
+        return this.destinationNameMap;
+    }
+    
+    public AjaxListener getListener() {
+        return this.listener;
+    }
+    
+    public long getMillisSinceLastAccessed() {
+        return this.getNow() - this.lastAccessed;
+    }
+    
+    public void updateLastAccessed() {
+        this.lastAccessed = this.getNow();
+    }
+    
+    public boolean closeIfExpired() {
+        long now = (new Date()).getTime();
+        boolean returnVal = false;
+        if( this.getMillisSinceLastAccessed() > this.expireAfter ) {
+            this.close();
+            returnVal = true;
+        }
+        return returnVal;
+    }
+    
+    protected long getNow() {
+        return (new Date()).getTime();
+    }
+}

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java?rev=1022071&r1=1022070&r2=1022071&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
(original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
Wed Oct 13 11:41:16 2010
@@ -23,6 +23,10 @@ import java.io.StringWriter;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -66,12 +70,14 @@ import org.eclipse.jetty.continuation.Co
  */
 public class MessageListenerServlet extends MessageServletSupport {
     private static final Log LOG = LogFactory.getLog(MessageListenerServlet.class);
-
+    
     private String readTimeoutParameter = "timeout";
     private long defaultReadTimeout = -1;
     private long maximumReadTimeout = 25000;
     private int maximumMessages = 100;
-
+    private Timer clientCleanupTimer = new Timer();
+    private HashMap<String,AjaxWebClient> ajaxWebClients = new HashMap<String,AjaxWebClient>();
+    
     public void init() throws ServletException {
         ServletConfig servletConfig = getServletConfig();
         String name = servletConfig.getInitParameter("defaultReadTimeout");
@@ -86,8 +92,9 @@ public class MessageListenerServlet exte
         if (name != null) {
             maximumMessages = (int)asLong(name);
         }
+        clientCleanupTimer.schedule( new ClientCleaner(), 5000, 60000 );
     }
-
+    
     /**
      * Sends a message to a destination or manage subscriptions. If the the
      * content type of the POST is
@@ -110,14 +117,13 @@ public class MessageListenerServlet exte
     protected void doPost(HttpServletRequest request, HttpServletResponse response) throws
ServletException, IOException {
 
         // lets turn the HTTP post into a JMS Message
-
-        WebClient client = WebClient.getWebClient(request);
+        AjaxWebClient client = getAjaxWebClient( request );
         String messageIds = "";
 
         synchronized (client) {
 
             if (LOG.isDebugEnabled()) {
-                LOG.debug("POST client=" + client + " session=" + request.getSession().getId()
+ " info=" + request.getPathInfo() + " contentType=" + request.getContentType());
+                LOG.debug("POST client=" + client + " session=" + request.getSession().getId()
+ " clientId="+ request.getParameter("clientId") + " info=" + request.getPathInfo() + " contentType="
+ request.getContentType());
                 // dump(request.getParameterMap());
             }
 
@@ -151,27 +157,27 @@ public class MessageListenerServlet exte
                     messages++;
 
                     if ("listen".equals(type)) {
-                        Listener listener = getListener(request);
-                        Map<MessageAvailableConsumer, String> consumerIdMap = getConsumerIdMap(request);
-                        Map<MessageAvailableConsumer, String> consumerDestinationMap
= getConsumerDestinationNameMap(request);
+                        AjaxListener listener = client.getListener();
+                        Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
+                        Map<MessageAvailableConsumer, String> consumerDestinationNameMap
= client.getDestinationNameMap();
                         client.closeConsumer(destination); // drop any existing
                         // consumer.
                         MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination,
request.getHeader(WebClient.selectorName));
 
                         consumer.setAvailableListener(listener);
                         consumerIdMap.put(consumer, message);
-                        consumerDestinationMap.put(consumer, destinationName);
+                        consumerDestinationNameMap.put(consumer, destinationName);
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Subscribed: " + consumer + " to " + destination +
" id=" + message);
                         }
                     } else if ("unlisten".equals(type)) {
-                        Map<MessageAvailableConsumer, String> consumerIdMap = getConsumerIdMap(request);
-                        Map consumerDestinationMap = getConsumerDestinationNameMap(request);
+                        Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
+                        Map consumerDestinationNameMap = client.getDestinationNameMap();
                         MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination,
request.getHeader(WebClient.selectorName));
 
                         consumer.setAvailableListener(null);
                         consumerIdMap.remove(consumer);
-                        consumerDestinationMap.remove(consumer);
+                        consumerDestinationNameMap.remove(consumer);
                         client.closeConsumer(destination);
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Unsubscribed: " + consumer);
@@ -233,9 +239,9 @@ public class MessageListenerServlet exte
      */
     protected void doGet(HttpServletRequest request, HttpServletResponse response) throws
ServletException, IOException {
         try {
-            WebClient client = WebClient.getWebClient(request);
+            AjaxWebClient client = getAjaxWebClient(request);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("GET client=" + client + " session=" + request.getSession().getId()
+ " uri=" + request.getRequestURI() + " query=" + request.getQueryString());
+                LOG.debug("GET client=" + client + " session=" + request.getSession().getId()
+ " clientId="+ request.getParameter("clientId") + " uri=" + request.getRequestURI() + " query="
+ request.getQueryString());
             }
 
             doMessages(client, request, response);
@@ -253,7 +259,7 @@ public class MessageListenerServlet exte
      * @throws ServletException
      * @throws IOException
      */
-    protected void doMessages(WebClient client, HttpServletRequest request, HttpServletResponse
response) throws JMSException, IOException {
+    protected void doMessages(AjaxWebClient client, HttpServletRequest request, HttpServletResponse
response) throws JMSException, IOException {
 
         int messages = 0;
         // This is a poll for any messages
@@ -286,7 +292,11 @@ public class MessageListenerServlet exte
                     }
                 }
             }
-
+            
+            // prepare the response
+            response.setContentType("text/xml");
+            response.setHeader("Cache-Control", "no-cache");
+            
             if (message == null) {
                 Continuation continuation = ContinuationSupport.getContinuation(request);
                 
@@ -308,23 +318,19 @@ public class MessageListenerServlet exte
                 continuation.suspend();
                 
                 // Fetch the listeners
-                Listener listener = getListener(request);
+                AjaxListener listener = client.getListener();
 
                 // register this continuation with our listener.
                 listener.setContinuation(continuation);
                 
                 return;
             }
-            
-            // prepare the responds
-            response.setContentType("text/xml");
-            response.setHeader("Cache-Control", "no-cache");
 
             StringWriter swriter = new StringWriter();
             PrintWriter writer = new PrintWriter(swriter);
-
-            Map<MessageAvailableConsumer, String> consumerIdMap = getConsumerIdMap(request);
-            Map<MessageAvailableConsumer, String> consumerDestinationNameMap = getConsumerDestinationNameMap(request);
+            
+            Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
+            Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
             response.setStatus(HttpServletResponse.SC_OK);
             writer.println("<ajax-response>");
 
@@ -388,40 +394,35 @@ public class MessageListenerServlet exte
         }
         writer.println("</response>");
     }
-
-    protected Listener getListener(HttpServletRequest request) {
-        HttpSession session = request.getSession();
-        Listener listener = (Listener)session.getAttribute("mls.listener");
-        if (listener == null) {
-            listener = new Listener(WebClient.getWebClient(request));
-            session.setAttribute("mls.listener", listener);
-        }
-        return listener;
-    }
-
-    protected Map<MessageAvailableConsumer, String> getConsumerIdMap(HttpServletRequest
request) {
+    
+    /*
+     * Return the AjaxWebClient for this session+clientId.
+     * Create one if it does not already exist.
+     */
+    protected AjaxWebClient getAjaxWebClient( HttpServletRequest request ) {
+        long now = (new Date()).getTime();
         HttpSession session = request.getSession(true);
-        Map<MessageAvailableConsumer, String> map = (Map<MessageAvailableConsumer,
String>)session.getAttribute("mls.consumerIdMap");
-        if (map == null) {
-            map = new HashMap<MessageAvailableConsumer, String>();
-            session.setAttribute("mls.consumerIdMap", map);
+        
+        String clientId = request.getParameter( "clientId" );      
+        // if user doesn't supply a 'clientId', we'll just use a default.
+        if( clientId == null ) {
+            clientId = "defaultAjaxWebClient";
         }
-        return map;
-    }
-
-    protected Map<MessageAvailableConsumer, String> getConsumerDestinationNameMap(HttpServletRequest
request) {
-        HttpSession session = request.getSession(true);
-        Map<MessageAvailableConsumer, String> map = (Map<MessageAvailableConsumer,
String>)session.getAttribute("mls.consumerDestinationNameMap");
-        if (map == null) {
-            map = new HashMap<MessageAvailableConsumer, String>();
-            session.setAttribute("mls.consumerDestinationNameMap", map);
+        String sessionKey = session.getId() + '-' + clientId;
+        
+        AjaxWebClient client = ajaxWebClients.get( sessionKey );
+        synchronized (ajaxWebClients) {
+            // create a new AjaxWebClient if one does not already exist for this sessionKey.
+            if( client == null ) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug( "creating new AjaxWebClient in "+sessionKey );
+                }
+                client = new AjaxWebClient( request, maximumReadTimeout );
+                ajaxWebClients.put( sessionKey, client );
+            }
+            client.updateLastAccessed();
         }
-        return map;
-    }
-
-    protected boolean isRicoAjax(HttpServletRequest request) {
-        String rico = request.getParameter("rico");
-        return rico != null && rico.equals("true");
+        return client;
     }
 
     /**
@@ -440,48 +441,34 @@ public class MessageListenerServlet exte
         }
         return answer;
     }
-
+    
     /*
-     * Listen for available messages and wakeup any continuations.
+     * an instance of this class runs every minute (started in init), to clean up old web
clients & free resources.
      */
-    private class Listener implements MessageAvailableListener {
-        WebClient client;
-        long lastAccess;
-        Continuation continuation;
-
-        Listener(WebClient client) {
-            this.client = client;
-        }
-
-        public void access() {
-            lastAccess = System.currentTimeMillis();
-        }
-
-        public synchronized void setContinuation(Continuation continuation) {
-            this.continuation = continuation;
-        }
-
-        public synchronized void onMessageAvailable(MessageConsumer consumer) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("message for " + consumer + "continuation=" + continuation);
+    private class ClientCleaner extends TimerTask {
+        public void run() {
+            if( LOG.isDebugEnabled() ) {
+                LOG.debug( "Cleaning up expired web clients." );
             }
-            if (continuation != null) {
-                    try {
-                        Message message = consumer.receive(10);
-                        continuation.setAttribute("message", message);
-                        continuation.setAttribute("consumer", consumer);
-                    } catch (Exception e) {
-                        LOG.error("Error receiving message " + e, e);
+            
+            synchronized( ajaxWebClients ) {
+                Iterator it = ajaxWebClients.entrySet().iterator();
+                while ( it.hasNext() ) {
+                    Map.Entry<String,AjaxWebClient> e = (Map.Entry<String,AjaxWebClient>)it.next();
+                    String key = e.getKey();
+                    AjaxWebClient val = e.getValue();
+                    if ( LOG.isDebugEnabled() ) {
+                        LOG.debug( "AjaxWebClient " + key + " last accessed " + val.getMillisSinceLastAccessed()/1000
+ " seconds ago." );
+                    }
+                    // close an expired client and remove it from the ajaxWebClients hash.
+                    if( val.closeIfExpired() ) {
+                        if ( LOG.isDebugEnabled() ) {
+                            LOG.debug( "Removing expired AjaxWebClient " + key );
+                        }
+                        it.remove();
                     }
-                    continuation.resume();
-            } else if (System.currentTimeMillis() - lastAccess > 2 * maximumReadTimeout)
{
-                new Thread() {
-                    public void run() {
-                        client.closeConsumers();
-                    };
-                }.start();
+                }
             }
         }
-
     }
 }



Mime
View raw message