tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject svn commit: r723775 [2/5] - in /tomcat/trunk: java/org/apache/juli/ modules/bayeux/ modules/bayeux/java/org/apache/cometd/bayeux/ modules/bayeux/java/org/apache/tomcat/bayeux/ modules/bayeux/java/org/apache/tomcat/bayeux/request/ modules/bayeux/test/or...
Date Fri, 05 Dec 2008 15:57:46 GMT
Modified: tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/BayeuxServlet.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/BayeuxServlet.java?rev=723775&r1=723774&r2=723775&view=diff
==============================================================================
--- tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/BayeuxServlet.java (original)
+++ tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/BayeuxServlet.java Fri Dec  5 07:57:43 2008
@@ -1,236 +1,236 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tomcat.bayeux;
-
-import java.io.IOException;
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.catalina.CometEvent;
-import org.apache.catalina.CometProcessor;
-import org.apache.juli.logging.Log;
-import org.apache.juli.logging.LogFactory;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.apache.cometd.bayeux.Bayeux;
-
-/**
- * 
- * @author Filip Hanik
- * @author Guy Molinari
- * @version 1.0
- */
-public class BayeuxServlet implements CometProcessor {
-
-    /**
-     * Attribute to hold the TomcatBayeux object in the servlet context
-     */
-    public static final String TOMCAT_BAYEUX_ATTR = Bayeux.DOJOX_COMETD_BAYEUX;
-    
-    /**
-     * Logger object
-     */
-    protected static Log log = LogFactory.getLog(BayeuxServlet.class);
-
-    /**
-     * Servlet config - for future use
-     */
-    protected ServletConfig servletConfig;
-    
-    /**
-     * Reference to the global TomcatBayeux object
-     */
-    protected TomcatBayeux tb;
-    
-    /**
-     * Upon servlet destruction, the servlet will clean up the 
-     * TomcatBayeux object and terminate any outstanding events.
-     */
-    public void destroy() {
-        servletConfig = null;
-        //to do, close all outstanding comet events
-        //tb.destroy();
-        tb = null;//TO DO, close everything down
-        
-    }
-    
-    /**
-     * Returns the preconfigured connection timeout.
-     * If no timeout has been configured as a servlet init parameter named <code>timeout</code>
-     * then the default of 2min will be used.
-     * @return int - the timeout for a connection in milliseconds
-     */
-    protected int getTimeout() {
-        String timeoutS = servletConfig.getInitParameter("timeout");
-        int timeout = 120*1000; //2 min
-        try {
-            timeout = Integer.parseInt(timeoutS);
-        }catch (NumberFormatException nfe) {
-            //ignore, we have a default value
-        }
-        return timeout;
-    }
-    
-    protected int getReconnectInterval() {
-        String rs = servletConfig.getInitParameter("reconnectInterval");
-        int rct = 1000; //1 seconds
-        try {
-            rct = Integer.parseInt(rs);
-        }catch (NumberFormatException nfe) {
-            //ignore, we have a default value
-        }
-        return rct;
-    }
-
-
-    public void event(CometEvent cometEvent) throws IOException, ServletException {
-        CometEvent.EventType type = cometEvent.getEventType();
-        if (log.isDebugEnabled()) {
-            log.debug("["+Thread.currentThread().getName()+"] Received Comet Event type="+type+" subtype:"+cometEvent.getEventSubType());
-        }
-        synchronized (cometEvent) {
-            if (type==CometEvent.EventType.BEGIN) {
-                //begin event, set the timeout
-                cometEvent.setTimeout(getTimeout());
-                //checkBayeux(cometEvent); - READ event should always come
-            } else if (type==CometEvent.EventType.READ) {
-                checkBayeux(cometEvent);
-            } else if (type==CometEvent.EventType.ERROR) {
-                tb.remove(cometEvent);
-                cometEvent.close();
-            } else if (type==CometEvent.EventType.END) {
-                tb.remove(cometEvent);
-                cometEvent.close();
-            }//end if
-            
-        }//synchronized
-    }//event
-
-    /**
-     * 
-     * @param cometEvent CometEvent
-     * @return boolean - true if we comet event stays open
-     * @throws IOException
-     * @throws UnsupportedOperationException
-     */
-    protected void checkBayeux(CometEvent cometEvent) throws IOException, UnsupportedOperationException {
-        //we actually have data.
-        //data can be text/json or 
-        if (Bayeux.JSON_CONTENT_TYPE.equals(cometEvent.getHttpServletRequest().getContentType())) {
-            //read and decode the bytes according to content length
-            log.warn("["+Thread.currentThread().getName()+"] JSON encoding not supported, will throw an exception and abort the request.");
-            int contentlength = cometEvent.getHttpServletRequest().getContentLength();
-            throw new UnsupportedOperationException("Decoding "+Bayeux.JSON_CONTENT_TYPE+" not yet implemented.");
-        } else { //GET method or application/x-www-form-urlencoded
-            String message = cometEvent.getHttpServletRequest().getParameter(Bayeux.MESSAGE_PARAMETER);
-            if (log.isTraceEnabled()) {
-                log.trace("["+Thread.currentThread().getName()+"] Received JSON message:"+message);
-            }
-            try {
-                int action = handleBayeux(message, cometEvent);
-                if (log.isDebugEnabled()) {
-                    log.debug("["+Thread.currentThread().getName()+"] Bayeux handling complete, action result="+action);
-                }
-                if (action<=0) {
-                    cometEvent.close();
-                }
-            }catch (Exception x) {
-                x.printStackTrace();
-                tb.remove(cometEvent);
-                log.error(x);
-                cometEvent.close();
-            }
-        }
-    }
-    
-    protected int handleBayeux(String message, CometEvent event) throws IOException, ServletException {
-        int result = 0;
-        if (message==null || message.length()==0) return result;
-        try {
-            BayeuxRequest request = null;
-            //a message can be an array of messages
-            JSONArray jsArray = new JSONArray(message);
-            for (int i = 0; i < jsArray.length(); i++) {
-                JSONObject msg = jsArray.getJSONObject(i);
-                
-                if (log.isDebugEnabled()) {
-                    log.debug("["+Thread.currentThread().getName()+"] Processing bayeux message:"+msg);
-                }
-                request = RequestFactory.getRequest(tb,event,msg);
-                if (log.isDebugEnabled()) {
-                    log.debug("["+Thread.currentThread().getName()+"] Processing bayeux message using request:"+request);
-                }
-                result = request.process(result);
-                if (log.isDebugEnabled()) {
-                    log.debug("["+Thread.currentThread().getName()+"] Processing bayeux message result:"+result);
-                }
-            }
-            if (result>0 && request!=null) {
-                event.getHttpServletRequest().setAttribute(BayeuxRequest.LAST_REQ_ATTR, request);
-                ClientImpl ci = (ClientImpl)tb.getClient(((RequestBase)request).getClientId());
-                ci.addCometEvent(event);
-                if (log.isDebugEnabled()) {
-                    log.debug("["+Thread.currentThread().getName()+"] Done bayeux message added to request attribute");
-                }
-            } else if (result == 0 && request!=null) {
-                RequestBase.deliver(event,(ClientImpl)tb.getClient(((RequestBase)request).getClientId()));
-                if (log.isDebugEnabled()) {
-                    log.debug("["+Thread.currentThread().getName()+"] Done bayeux message, delivered to client");
-                }
-            }
-            
-        }catch (JSONException x) {
-            log.error(x);//to do impl error handling
-            result = -1;
-        }catch (BayeuxException x) {
-            log.error(x); //to do impl error handling
-            result = -1;
-        }
-        return result;
-    }
-
-    public ServletConfig getServletConfig() {
-        return servletConfig;
-    }
-
-    public String getServletInfo() {
-        return "Tomcat/BayeuxServlet/1.0";
-    }
-
-    public void init(ServletConfig servletConfig) throws ServletException {
-        
-        this.servletConfig = servletConfig;
-        ServletContext ctx = servletConfig.getServletContext();
-        if (ctx.getAttribute(TOMCAT_BAYEUX_ATTR)==null)
-            ctx.setAttribute(TOMCAT_BAYEUX_ATTR,new TomcatBayeux());
-        this.tb = (TomcatBayeux)ctx.getAttribute(TOMCAT_BAYEUX_ATTR);
-        tb.setReconnectInterval(getReconnectInterval());
-    }
-
-    public void service(ServletRequest servletRequest, ServletResponse servletResponse) throws ServletException, IOException {
-        if (servletResponse instanceof HttpServletResponse) {
-            ( (HttpServletResponse) servletResponse).sendError(500, "Misconfigured Tomcat server, must be configured to support Comet operations.");
-        } else {
-            throw new ServletException("Misconfigured Tomcat server, must be configured to support Comet operations for the Bayeux protocol.");
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.bayeux;
+
+import java.io.IOException;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.catalina.CometEvent;
+import org.apache.catalina.CometProcessor;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.apache.cometd.bayeux.Bayeux;
+
+/**
+ * 
+ * @author Filip Hanik
+ * @author Guy Molinari
+ * @version 1.0
+ */
+public class BayeuxServlet implements CometProcessor {
+
+    /**
+     * Attribute to hold the TomcatBayeux object in the servlet context
+     */
+    public static final String TOMCAT_BAYEUX_ATTR = Bayeux.DOJOX_COMETD_BAYEUX;
+    
+    /**
+     * Logger object
+     */
+    protected static Log log = LogFactory.getLog(BayeuxServlet.class);
+
+    /**
+     * Servlet config - for future use
+     */
+    protected ServletConfig servletConfig;
+    
+    /**
+     * Reference to the global TomcatBayeux object
+     */
+    protected TomcatBayeux tb;
+    
+    /**
+     * Upon servlet destruction, the servlet will clean up the 
+     * TomcatBayeux object and terminate any outstanding events.
+     */
+    public void destroy() {
+        servletConfig = null;
+        //to do, close all outstanding comet events
+        //tb.destroy();
+        tb = null;//TO DO, close everything down
+        
+    }
+    
+    /**
+     * Returns the preconfigured connection timeout.
+     * If no timeout has been configured as a servlet init parameter named <code>timeout</code>
+     * then the default of 2min will be used.
+     * @return int - the timeout for a connection in milliseconds
+     */
+    protected int getTimeout() {
+        String timeoutS = servletConfig.getInitParameter("timeout");
+        int timeout = 120*1000; //2 min
+        try {
+            timeout = Integer.parseInt(timeoutS);
+        }catch (NumberFormatException nfe) {
+            //ignore, we have a default value
+        }
+        return timeout;
+    }
+    
+    protected int getReconnectInterval() {
+        String rs = servletConfig.getInitParameter("reconnectInterval");
+        int rct = 1000; //1 seconds
+        try {
+            rct = Integer.parseInt(rs);
+        }catch (NumberFormatException nfe) {
+            //ignore, we have a default value
+        }
+        return rct;
+    }
+
+
+    public void event(CometEvent cometEvent) throws IOException, ServletException {
+        CometEvent.EventType type = cometEvent.getEventType();
+        if (log.isDebugEnabled()) {
+            log.debug("["+Thread.currentThread().getName()+"] Received Comet Event type="+type+" subtype:"+cometEvent.getEventSubType());
+        }
+        synchronized (cometEvent) {
+            if (type==CometEvent.EventType.BEGIN) {
+                //begin event, set the timeout
+                cometEvent.setTimeout(getTimeout());
+                //checkBayeux(cometEvent); - READ event should always come
+            } else if (type==CometEvent.EventType.READ) {
+                checkBayeux(cometEvent);
+            } else if (type==CometEvent.EventType.ERROR) {
+                tb.remove(cometEvent);
+                cometEvent.close();
+            } else if (type==CometEvent.EventType.END) {
+                tb.remove(cometEvent);
+                cometEvent.close();
+            }//end if
+            
+        }//synchronized
+    }//event
+
+    /**
+     * 
+     * @param cometEvent CometEvent
+     * @return boolean - true if we comet event stays open
+     * @throws IOException
+     * @throws UnsupportedOperationException
+     */
+    protected void checkBayeux(CometEvent cometEvent) throws IOException, UnsupportedOperationException {
+        //we actually have data.
+        //data can be text/json or 
+        if (Bayeux.JSON_CONTENT_TYPE.equals(cometEvent.getHttpServletRequest().getContentType())) {
+            //read and decode the bytes according to content length
+            log.warn("["+Thread.currentThread().getName()+"] JSON encoding not supported, will throw an exception and abort the request.");
+            int contentlength = cometEvent.getHttpServletRequest().getContentLength();
+            throw new UnsupportedOperationException("Decoding "+Bayeux.JSON_CONTENT_TYPE+" not yet implemented.");
+        } else { //GET method or application/x-www-form-urlencoded
+            String message = cometEvent.getHttpServletRequest().getParameter(Bayeux.MESSAGE_PARAMETER);
+            if (log.isTraceEnabled()) {
+                log.trace("["+Thread.currentThread().getName()+"] Received JSON message:"+message);
+            }
+            try {
+                int action = handleBayeux(message, cometEvent);
+                if (log.isDebugEnabled()) {
+                    log.debug("["+Thread.currentThread().getName()+"] Bayeux handling complete, action result="+action);
+                }
+                if (action<=0) {
+                    cometEvent.close();
+                }
+            }catch (Exception x) {
+                x.printStackTrace();
+                tb.remove(cometEvent);
+                log.error(x);
+                cometEvent.close();
+            }
+        }
+    }
+    
+    protected int handleBayeux(String message, CometEvent event) throws IOException, ServletException {
+        int result = 0;
+        if (message==null || message.length()==0) return result;
+        try {
+            BayeuxRequest request = null;
+            //a message can be an array of messages
+            JSONArray jsArray = new JSONArray(message);
+            for (int i = 0; i < jsArray.length(); i++) {
+                JSONObject msg = jsArray.getJSONObject(i);
+                
+                if (log.isDebugEnabled()) {
+                    log.debug("["+Thread.currentThread().getName()+"] Processing bayeux message:"+msg);
+                }
+                request = RequestFactory.getRequest(tb,event,msg);
+                if (log.isDebugEnabled()) {
+                    log.debug("["+Thread.currentThread().getName()+"] Processing bayeux message using request:"+request);
+                }
+                result = request.process(result);
+                if (log.isDebugEnabled()) {
+                    log.debug("["+Thread.currentThread().getName()+"] Processing bayeux message result:"+result);
+                }
+            }
+            if (result>0 && request!=null) {
+                event.getHttpServletRequest().setAttribute(BayeuxRequest.LAST_REQ_ATTR, request);
+                ClientImpl ci = (ClientImpl)tb.getClient(((RequestBase)request).getClientId());
+                ci.addCometEvent(event);
+                if (log.isDebugEnabled()) {
+                    log.debug("["+Thread.currentThread().getName()+"] Done bayeux message added to request attribute");
+                }
+            } else if (result == 0 && request!=null) {
+                RequestBase.deliver(event,(ClientImpl)tb.getClient(((RequestBase)request).getClientId()));
+                if (log.isDebugEnabled()) {
+                    log.debug("["+Thread.currentThread().getName()+"] Done bayeux message, delivered to client");
+                }
+            }
+            
+        }catch (JSONException x) {
+            log.error(x);//to do impl error handling
+            result = -1;
+        }catch (BayeuxException x) {
+            log.error(x); //to do impl error handling
+            result = -1;
+        }
+        return result;
+    }
+
+    public ServletConfig getServletConfig() {
+        return servletConfig;
+    }
+
+    public String getServletInfo() {
+        return "Tomcat/BayeuxServlet/1.0";
+    }
+
+    public void init(ServletConfig servletConfig) throws ServletException {
+        
+        this.servletConfig = servletConfig;
+        ServletContext ctx = servletConfig.getServletContext();
+        if (ctx.getAttribute(TOMCAT_BAYEUX_ATTR)==null)
+            ctx.setAttribute(TOMCAT_BAYEUX_ATTR,new TomcatBayeux());
+        this.tb = (TomcatBayeux)ctx.getAttribute(TOMCAT_BAYEUX_ATTR);
+        tb.setReconnectInterval(getReconnectInterval());
+    }
+
+    public void service(ServletRequest servletRequest, ServletResponse servletResponse) throws ServletException, IOException {
+        if (servletResponse instanceof HttpServletResponse) {
+            ( (HttpServletResponse) servletResponse).sendError(500, "Misconfigured Tomcat server, must be configured to support Comet operations.");
+        } else {
+            throw new ServletException("Misconfigured Tomcat server, must be configured to support Comet operations for the Bayeux protocol.");
+        }
+    }
+}

Propchange: tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/BayeuxServlet.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/ChannelImpl.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/ChannelImpl.java?rev=723775&r1=723774&r2=723775&view=diff
==============================================================================
--- tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/ChannelImpl.java (original)
+++ tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/ChannelImpl.java Fri Dec  5 07:57:43 2008
@@ -1,189 +1,189 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tomcat.bayeux;
-
-import java.util.LinkedList;
-
-import org.apache.cometd.bayeux.Channel;
-import org.apache.cometd.bayeux.Client;
-import org.apache.cometd.bayeux.DataFilter;
-import java.util.Collections;
-import java.util.List;
-import org.apache.cometd.bayeux.Message;
-import java.util.Iterator;
-import org.apache.juli.logging.Log;
-import org.apache.juli.logging.LogFactory;
-/**
- * 
- * @author Filip Hanik
- * @version 1.0
- */
-public class ChannelImpl implements Channel {
-    
-    protected static Log log = LogFactory.getLog(ChannelImpl.class);
-    
-    /**
-     * The unique id of this channel
-     */
-    protected String id = null;
-    
-    /**
-     * A list of the current subscribers
-     */
-    protected LinkedList<Client> subscribers = new LinkedList<Client>();
-    
-    /**
-     * A list of the current filters
-     */
-    protected LinkedList<DataFilter> filters = new LinkedList<DataFilter>();
-    
-    /**
-     * Is this channel persistent, default value is true
-     */
-    protected boolean persistent = true; 
-    
-    /**
-     * Creates a new channel
-     * @param id String - the id of the channel, can not be null
-     */
-    protected ChannelImpl(String id) {
-        assert id != null;
-        this.id = id;
-    }
-
-    /**
-     * returns the id of this channel
-     * @return String
-     */
-    public String getId() {
-        return id;
-    }
-    
-    /**
-     * Returns true if this channel matches the pattern to its id.
-     * The channel pattern can be a complete name like <code>/service/mychannel</code>
-     * or it can be a wild card pattern like <code>/service/app2/**</code>
-     * @param pattern String according to the Bayeux specification section 2.2.1 Channel Globbing, can not be null.
-     * @return boolean true if the id of this channel matches the pattern
-     */
-    public boolean matches(String pattern) {
-        if (pattern == null)
-            throw new NullPointerException("Channel pattern must not be null.");
-        if (getId().equals(pattern))
-            return true;
-        int wildcardPos = pattern.indexOf("/*");
-        if (wildcardPos == -1)
-            return false;
-        boolean multiSegment = pattern.indexOf("**") != -1;
-        String leadSubstring = pattern.substring(0, wildcardPos);
-        if (leadSubstring == null)
-            return false;
-        if (multiSegment) 
-            return getId().startsWith(leadSubstring);
-        else {
-            if (getId().length() <= wildcardPos + 2)
-                return false;
-            return !(getId().substring(wildcardPos + 2).contains("/"));
-        }
-    }
-
-
-
-    /**
-     * @return returns a non modifiable list of the subscribers for this channel.
-     */
-    public List<Client> getSubscribers() {
-        return Collections.unmodifiableList(subscribers);
-    }
-
-    /**
-     * @return true if the Channel will persist without any subscription.
-     */
-    public boolean isPersistent() {
-        return persistent;
-    }
-    
-    public void publish(Message msg) {
-        publish(new Message[] {msg});
-    }
-
-    public void publish(Message[] msgs) {
-        if (msgs==null) return;
-        MessageImpl[] imsgs = new MessageImpl[msgs.length];
-        for (int i=0; msgs!=null && i<msgs.length; i++) {
-            Message data = msgs[i];
-
-            if (!(data instanceof MessageImpl)) 
-                throw new IllegalArgumentException("Invalid message class, you can only publish messages "+
-                                                   "created through the Bayeux.newMessage() method");
-            if (log.isDebugEnabled()) {
-                log.debug("Publishing message:"+data+" to channel:"+this);
-            }
-            //clone it so that we can set this channel as a reference
-            MessageImpl msg = (MessageImpl)((MessageImpl)data).clone();
-            //this is the channel it was delivered through
-            msg.setChannel(this);
-            //pass through filters
-            for (Iterator<DataFilter> it = filters.iterator(); it.hasNext(); ) {
-                it.next().filter(msg);
-            }
-            imsgs[i] = msg;
-        }
-        //deliver it to the clients
-        for (Iterator<Client> it = subscribers.iterator(); it.hasNext(); ) {
-            ClientImpl c = (ClientImpl)it.next();
-            c.deliverInternal(this,imsgs);
-        }
-        
-    }
-
-    public void setPersistent(boolean persistent) {
-        this.persistent = persistent;
-    }
-
-    public void subscribe(Client subscriber) {
-        if (!subscribers.contains((subscriber))) { 
-            subscribers.addLast(subscriber);
-            ((ClientImpl)subscriber).subscribed(this);
-        }
-    }
-
-    public Client unsubscribe(Client subscriber) {
-        if (subscribers.remove(subscriber)) {
-            ((ClientImpl)subscriber).unsubscribed(this);
-            return subscriber;
-        } else
-            return null;
-    }
-    
-    public void addFilter(DataFilter filter) {
-        if (!filters.contains(filter)) 
-            filters.addLast(filter);
-    }
-
-    public DataFilter removeFilter(DataFilter filter) {
-        if ( filters.remove(filter) ) return filter;
-        else return null;
-    }
-    
-    public String toString() {
-        StringBuffer buf = new StringBuffer(super.toString());
-        buf.append("; channelId=").append(getId());
-        return buf.toString();
-    }
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.bayeux;
+
+import java.util.LinkedList;
+
+import org.apache.cometd.bayeux.Channel;
+import org.apache.cometd.bayeux.Client;
+import org.apache.cometd.bayeux.DataFilter;
+import java.util.Collections;
+import java.util.List;
+import org.apache.cometd.bayeux.Message;
+import java.util.Iterator;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+/**
+ * 
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class ChannelImpl implements Channel {
+    
+    protected static Log log = LogFactory.getLog(ChannelImpl.class);
+    
+    /**
+     * The unique id of this channel
+     */
+    protected String id = null;
+    
+    /**
+     * A list of the current subscribers
+     */
+    protected LinkedList<Client> subscribers = new LinkedList<Client>();
+    
+    /**
+     * A list of the current filters
+     */
+    protected LinkedList<DataFilter> filters = new LinkedList<DataFilter>();
+    
+    /**
+     * Is this channel persistent, default value is true
+     */
+    protected boolean persistent = true; 
+    
+    /**
+     * Creates a new channel
+     * @param id String - the id of the channel, can not be null
+     */
+    protected ChannelImpl(String id) {
+        assert id != null;
+        this.id = id;
+    }
+
+    /**
+     * returns the id of this channel
+     * @return String
+     */
+    public String getId() {
+        return id;
+    }
+    
+    /**
+     * Returns true if this channel matches the pattern to its id.
+     * The channel pattern can be a complete name like <code>/service/mychannel</code>
+     * or it can be a wild card pattern like <code>/service/app2/**</code>
+     * @param pattern String according to the Bayeux specification section 2.2.1 Channel Globbing, can not be null.
+     * @return boolean true if the id of this channel matches the pattern
+     */
+    public boolean matches(String pattern) {
+        if (pattern == null)
+            throw new NullPointerException("Channel pattern must not be null.");
+        if (getId().equals(pattern))
+            return true;
+        int wildcardPos = pattern.indexOf("/*");
+        if (wildcardPos == -1)
+            return false;
+        boolean multiSegment = pattern.indexOf("**") != -1;
+        String leadSubstring = pattern.substring(0, wildcardPos);
+        if (leadSubstring == null)
+            return false;
+        if (multiSegment) 
+            return getId().startsWith(leadSubstring);
+        else {
+            if (getId().length() <= wildcardPos + 2)
+                return false;
+            return !(getId().substring(wildcardPos + 2).contains("/"));
+        }
+    }
+
+
+
+    /**
+     * @return returns a non modifiable list of the subscribers for this channel.
+     */
+    public List<Client> getSubscribers() {
+        return Collections.unmodifiableList(subscribers);
+    }
+
+    /**
+     * @return true if the Channel will persist without any subscription.
+     */
+    public boolean isPersistent() {
+        return persistent;
+    }
+    
+    public void publish(Message msg) {
+        publish(new Message[] {msg});
+    }
+
+    public void publish(Message[] msgs) {
+        if (msgs==null) return;
+        MessageImpl[] imsgs = new MessageImpl[msgs.length];
+        for (int i=0; msgs!=null && i<msgs.length; i++) {
+            Message data = msgs[i];
+
+            if (!(data instanceof MessageImpl)) 
+                throw new IllegalArgumentException("Invalid message class, you can only publish messages "+
+                                                   "created through the Bayeux.newMessage() method");
+            if (log.isDebugEnabled()) {
+                log.debug("Publishing message:"+data+" to channel:"+this);
+            }
+            //clone it so that we can set this channel as a reference
+            MessageImpl msg = (MessageImpl)((MessageImpl)data).clone();
+            //this is the channel it was delivered through
+            msg.setChannel(this);
+            //pass through filters
+            for (Iterator<DataFilter> it = filters.iterator(); it.hasNext(); ) {
+                it.next().filter(msg);
+            }
+            imsgs[i] = msg;
+        }
+        //deliver it to the clients
+        for (Iterator<Client> it = subscribers.iterator(); it.hasNext(); ) {
+            ClientImpl c = (ClientImpl)it.next();
+            c.deliverInternal(this,imsgs);
+        }
+        
+    }
+
+    public void setPersistent(boolean persistent) {
+        this.persistent = persistent;
+    }
+
+    public void subscribe(Client subscriber) {
+        if (!subscribers.contains((subscriber))) { 
+            subscribers.addLast(subscriber);
+            ((ClientImpl)subscriber).subscribed(this);
+        }
+    }
+
+    public Client unsubscribe(Client subscriber) {
+        if (subscribers.remove(subscriber)) {
+            ((ClientImpl)subscriber).unsubscribed(this);
+            return subscriber;
+        } else
+            return null;
+    }
+    
+    public void addFilter(DataFilter filter) {
+        if (!filters.contains(filter)) 
+            filters.addLast(filter);
+    }
+
+    public DataFilter removeFilter(DataFilter filter) {
+        if ( filters.remove(filter) ) return filter;
+        else return null;
+    }
+    
+    public String toString() {
+        StringBuffer buf = new StringBuffer(super.toString());
+        buf.append("; channelId=").append(getId());
+        return buf.toString();
+    }
+
 }
\ No newline at end of file

Propchange: tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/ChannelImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/ClientImpl.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/ClientImpl.java?rev=723775&r1=723774&r2=723775&view=diff
==============================================================================
--- tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/ClientImpl.java (original)
+++ tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/ClientImpl.java Fri Dec  5 07:57:43 2008
@@ -1,279 +1,279 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tomcat.bayeux;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.catalina.CometEvent;
-import org.json.JSONObject;
-import org.apache.cometd.bayeux.Bayeux;
-import org.apache.cometd.bayeux.Client;
-import org.apache.cometd.bayeux.Listener;
-import org.apache.cometd.bayeux.Message;
-import org.apache.juli.logging.Log;
-import org.apache.juli.logging.LogFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.HashMap;
-import java.util.ArrayList;
-
-public class ClientImpl implements Client {
-    
-    public static final int SUPPORT_CALLBACK_POLL = 0x1;
-    public static final int SUPPORT_LONG_POLL = 0x2; 
-
-    public static final String COMET_EVENT_ATTR = "org.apache.cometd.bayeux.client";
-    
-    protected static Log log = LogFactory.getLog(ClientImpl.class);
-
-    protected static LinkedList<Message> EMPTY_LIST = new LinkedList<Message>();
-    /**
-     * queued message for remote clients.
-     */
-    protected LinkedList<Message> messages = null;
-    
-    /**
-     * 
-     */
-    protected Queue<CometEvent> events = new LinkedList<CometEvent>();
-    
-    /**
-     * Unique id representing this client
-     */
-    protected String id;
-    
-    /**
-     * supported connection types, defaults to long-polling
-     */
-    protected int supportedConnTypes = SUPPORT_LONG_POLL | SUPPORT_CALLBACK_POLL;
-    
-    /**
-     * The desired connection type
-     */
-    protected int desirectConnType = SUPPORT_LONG_POLL;
-    
-    /**
-     * Does this client use json-comment-filtered messages
-     */
-    protected boolean useJsonFiltered = false;
-    
-    /**
-     * Same JVM clients, get local=true
-     */
-    protected boolean local;
-    
-    /**
-     * The callback object for local clients
-     */
-    protected Listener listener;
-    
-    protected AtomicInteger nrofsubscriptions = new AtomicInteger(0);
-    
-    protected ClientImpl(String id, boolean local) {
-        this.id = id;
-        this.local = local;
-        if (!local) messages = new LinkedList<Message>();
-    }
-    
-    protected ClientImpl(String id, CometEvent event) {
-        this(id,false);
-        events = new ConcurrentLinkedQueue<CometEvent>();
-        addCometEvent(event);
-    }
-
-    public synchronized void deliver(Message message) {
-        deliverInternal(null,new MessageImpl[] {(MessageImpl)message});
-    }
-    
-    public synchronized void deliver(Message[] message) {
-        deliverInternal(null,message);
-    }
-
-    protected synchronized void deliverInternal(ChannelImpl channel, MessageImpl message) {
-        deliverInternal(channel,new MessageImpl[] {message});
-    }
-
-    protected synchronized void deliverInternal(ChannelImpl channel, Message[] msgs) {
-        if (isLocal()) {
-            //local clients must have a listener
-            ArrayList<Message> list = new ArrayList<Message>();
-            for (int i=0; msgs!=null && i<msgs.length; i++) {
-                //dont deliver to ourselves
-                if (this!=msgs[i].getClient()) list.add(msgs[i]);
-            }
-            if (getListener() != null && list.size()>0) {
-                getListener().deliver(list.toArray(new Message[0]));
-            }
-        } else {
-            for (int i=0; msgs!=null && i<msgs.length; i++) {
-                MessageImpl message = (MessageImpl)msgs[i];
-                if (this==message.getClient()) { 
-                    //dont deliver to ourself
-                    continue;
-                }
-                //we are not implementing forever responses, if the client is connected
-                //then we will fire off the message
-                //first we check to see if we have any existing connections we can piggy back on
-                CometEvent event = events.poll();
-                boolean delivered = false;
-                //TODO TODO - check on thread safety, for writing and for getting last request.
-                if (event!=null) {
-                    synchronized (event) {
-                        RequestBase rq = (RequestBase)event.getHttpServletRequest().getAttribute(RequestBase.LAST_REQ_ATTR);
-                        if (rq!=null) {
-                            Map map = new HashMap();
-                            try {
-                                map.put(Bayeux.CHANNEL_FIELD,message.getChannel().getId());
-                                map.put(Bayeux.DATA_FIELD,message);
-                                JSONObject json = new JSONObject(map);
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Message instantly delivered to remote client["+this+"] message:"+json);
-                                }
-                                rq.addToDeliveryQueue(this, json);
-                                //deliver the batch
-                                if (i==(msgs.length-1)) {
-                                    rq.deliver(event, this);
-                                    event.close(); //todo, figure out a better way, this means only one message gets delivered
-                                    removeCometEvent(event); //and delivered instantly
-                                }
-                                delivered = true;
-                            } catch (Exception x) {
-                                log.error(x);
-                            }
-                        }
-                    }
-                } 
-                if (!delivered) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Message added to queue for remote client["+this+"] message:"+message);
-                    }
-                    //queue the message for the next round
-                    messages.add(message);
-                }
-            }
-        }
-    }
-
-    public String getId() {
-        return this.id;
-    }
-
-    protected Listener getListener() {
-        return listener;
-    }
-
-    public boolean hasMessages() {
-        if (isLocal()) return false;
-        else {
-            return messages.size() > 0;
-        }
-    }
-
-    public boolean isLocal() {
-        return local;
-    }
-
-    public int getSupportedConnTypes() {
-        return supportedConnTypes;
-    }
-
-    public int getDesirectConnType() {
-        return desirectConnType;
-    }
-
-    public boolean useJsonFiltered() {
-        return useJsonFiltered;
-    }
-
-    public void setListener(Listener listener) {
-        this.listener = listener;
-    }
-
-    public void setSupportedConnTypes(int supportedConnTypes) {
-        this.supportedConnTypes = supportedConnTypes;
-    }
-
-    public void setUseJsonFiltered(boolean useJsonFiltered) {
-        this.useJsonFiltered = useJsonFiltered;
-    }
-
-    public void setDesirectConnType(int desirectConnType) {
-        this.desirectConnType = desirectConnType;
-    }
-
-    public boolean supportsCallbackPoll() {
-        return (supportedConnTypes & SUPPORT_CALLBACK_POLL) == SUPPORT_CALLBACK_POLL;
-    }
-
-    public boolean supportsLongPoll() {
-        return (supportedConnTypes & SUPPORT_LONG_POLL) == SUPPORT_LONG_POLL;
-    }
-
-    public synchronized List<Message> takeMessages() {
-        if (isLocal()) return null;
-        if (messages.size()==0) return EMPTY_LIST;
-        List result = new LinkedList(messages);
-        messages.clear();
-        return result;
-    }
-    
-    public String toString() {
-        StringBuffer buf = new StringBuffer(super.toString());
-        buf.append(" id=").append(getId());
-        return buf.toString();
-    }
-    
-    public boolean isSubscribed() {
-        return nrofsubscriptions.get()>0;
-    }
-    
-    protected synchronized boolean addCometEvent(CometEvent event) {
-        boolean result = false;
-        if (!events.contains(event)) {
-            events.add(event);
-            result = true;
-        }
-        event.getHttpServletRequest().setAttribute(COMET_EVENT_ATTR,this);
-        return result;
-    }
-    
-    protected synchronized boolean removeCometEvent(CometEvent event) {
-        boolean result = events.remove(event);
-        event.getHttpServletRequest().removeAttribute(COMET_EVENT_ATTR);
-        return result;
-    }
-    
-    
-    protected void subscribed(ChannelImpl ch) {
-        nrofsubscriptions.addAndGet(1);
-    }
-    
-    protected void unsubscribed(ChannelImpl ch) {
-        nrofsubscriptions.addAndGet(-1);
-    }
-    
-    public void startBatch(){
-        //noop until improved
-    }
-    public void endBatch() {
-        //noop until improved
-    }
-        
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.bayeux;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.catalina.CometEvent;
+import org.json.JSONObject;
+import org.apache.cometd.bayeux.Bayeux;
+import org.apache.cometd.bayeux.Client;
+import org.apache.cometd.bayeux.Listener;
+import org.apache.cometd.bayeux.Message;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.HashMap;
+import java.util.ArrayList;
+
+public class ClientImpl implements Client {
+    
+    public static final int SUPPORT_CALLBACK_POLL = 0x1;
+    public static final int SUPPORT_LONG_POLL = 0x2; 
+
+    public static final String COMET_EVENT_ATTR = "org.apache.cometd.bayeux.client";
+    
+    protected static Log log = LogFactory.getLog(ClientImpl.class);
+
+    protected static LinkedList<Message> EMPTY_LIST = new LinkedList<Message>();
+    /**
+     * queued message for remote clients.
+     */
+    protected LinkedList<Message> messages = null;
+    
+    /**
+     * 
+     */
+    protected Queue<CometEvent> events = new LinkedList<CometEvent>();
+    
+    /**
+     * Unique id representing this client
+     */
+    protected String id;
+    
+    /**
+     * supported connection types, defaults to long-polling
+     */
+    protected int supportedConnTypes = SUPPORT_LONG_POLL | SUPPORT_CALLBACK_POLL;
+    
+    /**
+     * The desired connection type
+     */
+    protected int desirectConnType = SUPPORT_LONG_POLL;
+    
+    /**
+     * Does this client use json-comment-filtered messages
+     */
+    protected boolean useJsonFiltered = false;
+    
+    /**
+     * Same JVM clients, get local=true
+     */
+    protected boolean local;
+    
+    /**
+     * The callback object for local clients
+     */
+    protected Listener listener;
+    
+    protected AtomicInteger nrofsubscriptions = new AtomicInteger(0);
+    
+    protected ClientImpl(String id, boolean local) {
+        this.id = id;
+        this.local = local;
+        if (!local) messages = new LinkedList<Message>();
+    }
+    
+    protected ClientImpl(String id, CometEvent event) {
+        this(id,false);
+        events = new ConcurrentLinkedQueue<CometEvent>();
+        addCometEvent(event);
+    }
+
+    public synchronized void deliver(Message message) {
+        deliverInternal(null,new MessageImpl[] {(MessageImpl)message});
+    }
+    
+    public synchronized void deliver(Message[] message) {
+        deliverInternal(null,message);
+    }
+
+    protected synchronized void deliverInternal(ChannelImpl channel, MessageImpl message) {
+        deliverInternal(channel,new MessageImpl[] {message});
+    }
+
+    protected synchronized void deliverInternal(ChannelImpl channel, Message[] msgs) {
+        if (isLocal()) {
+            //local clients must have a listener
+            ArrayList<Message> list = new ArrayList<Message>();
+            for (int i=0; msgs!=null && i<msgs.length; i++) {
+                //dont deliver to ourselves
+                if (this!=msgs[i].getClient()) list.add(msgs[i]);
+            }
+            if (getListener() != null && list.size()>0) {
+                getListener().deliver(list.toArray(new Message[0]));
+            }
+        } else {
+            for (int i=0; msgs!=null && i<msgs.length; i++) {
+                MessageImpl message = (MessageImpl)msgs[i];
+                if (this==message.getClient()) { 
+                    //dont deliver to ourself
+                    continue;
+                }
+                //we are not implementing forever responses, if the client is connected
+                //then we will fire off the message
+                //first we check to see if we have any existing connections we can piggy back on
+                CometEvent event = events.poll();
+                boolean delivered = false;
+                //TODO TODO - check on thread safety, for writing and for getting last request.
+                if (event!=null) {
+                    synchronized (event) {
+                        RequestBase rq = (RequestBase)event.getHttpServletRequest().getAttribute(RequestBase.LAST_REQ_ATTR);
+                        if (rq!=null) {
+                            Map map = new HashMap();
+                            try {
+                                map.put(Bayeux.CHANNEL_FIELD,message.getChannel().getId());
+                                map.put(Bayeux.DATA_FIELD,message);
+                                JSONObject json = new JSONObject(map);
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Message instantly delivered to remote client["+this+"] message:"+json);
+                                }
+                                rq.addToDeliveryQueue(this, json);
+                                //deliver the batch
+                                if (i==(msgs.length-1)) {
+                                    rq.deliver(event, this);
+                                    event.close(); //todo, figure out a better way, this means only one message gets delivered
+                                    removeCometEvent(event); //and delivered instantly
+                                }
+                                delivered = true;
+                            } catch (Exception x) {
+                                log.error(x);
+                            }
+                        }
+                    }
+                } 
+                if (!delivered) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Message added to queue for remote client["+this+"] message:"+message);
+                    }
+                    //queue the message for the next round
+                    messages.add(message);
+                }
+            }
+        }
+    }
+
+    public String getId() {
+        return this.id;
+    }
+
+    protected Listener getListener() {
+        return listener;
+    }
+
+    public boolean hasMessages() {
+        if (isLocal()) return false;
+        else {
+            return messages.size() > 0;
+        }
+    }
+
+    public boolean isLocal() {
+        return local;
+    }
+
+    public int getSupportedConnTypes() {
+        return supportedConnTypes;
+    }
+
+    public int getDesirectConnType() {
+        return desirectConnType;
+    }
+
+    public boolean useJsonFiltered() {
+        return useJsonFiltered;
+    }
+
+    public void setListener(Listener listener) {
+        this.listener = listener;
+    }
+
+    public void setSupportedConnTypes(int supportedConnTypes) {
+        this.supportedConnTypes = supportedConnTypes;
+    }
+
+    public void setUseJsonFiltered(boolean useJsonFiltered) {
+        this.useJsonFiltered = useJsonFiltered;
+    }
+
+    public void setDesirectConnType(int desirectConnType) {
+        this.desirectConnType = desirectConnType;
+    }
+
+    public boolean supportsCallbackPoll() {
+        return (supportedConnTypes & SUPPORT_CALLBACK_POLL) == SUPPORT_CALLBACK_POLL;
+    }
+
+    public boolean supportsLongPoll() {
+        return (supportedConnTypes & SUPPORT_LONG_POLL) == SUPPORT_LONG_POLL;
+    }
+
+    public synchronized List<Message> takeMessages() {
+        if (isLocal()) return null;
+        if (messages.size()==0) return EMPTY_LIST;
+        List result = new LinkedList(messages);
+        messages.clear();
+        return result;
+    }
+    
+    public String toString() {
+        StringBuffer buf = new StringBuffer(super.toString());
+        buf.append(" id=").append(getId());
+        return buf.toString();
+    }
+    
+    public boolean isSubscribed() {
+        return nrofsubscriptions.get()>0;
+    }
+    
+    protected synchronized boolean addCometEvent(CometEvent event) {
+        boolean result = false;
+        if (!events.contains(event)) {
+            events.add(event);
+            result = true;
+        }
+        event.getHttpServletRequest().setAttribute(COMET_EVENT_ATTR,this);
+        return result;
+    }
+    
+    protected synchronized boolean removeCometEvent(CometEvent event) {
+        boolean result = events.remove(event);
+        event.getHttpServletRequest().removeAttribute(COMET_EVENT_ATTR);
+        return result;
+    }
+    
+    
+    protected void subscribed(ChannelImpl ch) {
+        nrofsubscriptions.addAndGet(1);
+    }
+    
+    protected void unsubscribed(ChannelImpl ch) {
+        nrofsubscriptions.addAndGet(-1);
+    }
+    
+    public void startBatch(){
+        //noop until improved
+    }
+    public void endBatch() {
+        //noop until improved
+    }
+        
+}

Propchange: tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/ClientImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/HttpError.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/HttpError.java?rev=723775&r1=723774&r2=723775&view=diff
==============================================================================
--- tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/HttpError.java (original)
+++ tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/HttpError.java Fri Dec  5 07:57:43 2008
@@ -1,43 +1,43 @@
-package org.apache.tomcat.bayeux;
-
-public class HttpError {
-    private int code;
-    private String status;
-    private Throwable cause;
-    public HttpError(int code, String status, Throwable cause) {
-        this.code = code;
-        this.status = status;
-        this.cause = cause;
-    }
-
-    public void setCode(int code) {
-        this.code = code;
-    }
-
-    public void setStatus(String status) {
-        this.status = status;
-    }
-
-    public void setCause(Throwable exception) {
-        this.cause = exception;
-    }
-
-    public int getCode() {
-        return code;
-    }
-
-    public String getStatus() {
-        return status;
-    }
-
-    public Throwable getCause() {
-        return cause;
-    }
-
-    public String toString() {
-        if (cause != null)
-            return code + ":" + status + " - [" + cause + "]";
-        else
-            return code + ":" + status;
-    }
-}
+package org.apache.tomcat.bayeux;
+
+public class HttpError {
+    private int code;
+    private String status;
+    private Throwable cause;
+    public HttpError(int code, String status, Throwable cause) {
+        this.code = code;
+        this.status = status;
+        this.cause = cause;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+    }
+
+    public void setCause(Throwable exception) {
+        this.cause = exception;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public Throwable getCause() {
+        return cause;
+    }
+
+    public String toString() {
+        if (cause != null)
+            return code + ":" + status + " - [" + cause + "]";
+        else
+            return code + ":" + status;
+    }
+}

Propchange: tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/HttpError.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/MessageImpl.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/MessageImpl.java?rev=723775&r1=723774&r2=723775&view=diff
==============================================================================
--- tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/MessageImpl.java (original)
+++ tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/MessageImpl.java Fri Dec  5 07:57:43 2008
@@ -1,80 +1,80 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tomcat.bayeux;
-
-import java.util.HashMap;
-
-import org.apache.cometd.bayeux.Channel;
-import org.apache.cometd.bayeux.Client;
-import org.apache.cometd.bayeux.Message;
-
-public class MessageImpl extends HashMap<String,Object> implements Message {
-    
-    protected Channel channel;
-    protected Client client;
-    protected String id;
-    private long TTL = 1000*60*5; //5min is the default TTL for a message
-    protected long creationTime = System.currentTimeMillis();
-
-    public Object clone() {
-        MessageImpl copy = new MessageImpl(id);
-        copy.putAll(this);
-        copy.channel = channel;
-        copy.client = client;
-        copy.id = id;
-        copy.creationTime = creationTime;
-        copy.TTL = TTL;
-        return copy;
-    }
-
-    protected MessageImpl(String id) {
-        assert id != null;
-        this.id = id;
-    }
-
-    public Channel getChannel() {
-        return channel;
-    }
-
-    public Client getClient() {
-        return client;
-    }
-
-    public long getCreationTime() {
-        return creationTime;
-    }
-
-    public long getTTL() {
-        return TTL;
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    protected void setChannel(Channel channel) {
-        this.channel = channel;
-    }
-
-    protected void setClient(Client client) {
-        this.client = client;
-    }
-
-    public void setTTL(long TTL) {
-        this.TTL = TTL;
-    }
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.bayeux;
+
+import java.util.HashMap;
+
+import org.apache.cometd.bayeux.Channel;
+import org.apache.cometd.bayeux.Client;
+import org.apache.cometd.bayeux.Message;
+
+public class MessageImpl extends HashMap<String,Object> implements Message {
+    
+    protected Channel channel;
+    protected Client client;
+    protected String id;
+    private long TTL = 1000*60*5; //5min is the default TTL for a message
+    protected long creationTime = System.currentTimeMillis();
+
+    public Object clone() {
+        MessageImpl copy = new MessageImpl(id);
+        copy.putAll(this);
+        copy.channel = channel;
+        copy.client = client;
+        copy.id = id;
+        copy.creationTime = creationTime;
+        copy.TTL = TTL;
+        return copy;
+    }
+
+    protected MessageImpl(String id) {
+        assert id != null;
+        this.id = id;
+    }
+
+    public Channel getChannel() {
+        return channel;
+    }
+
+    public Client getClient() {
+        return client;
+    }
+
+    public long getCreationTime() {
+        return creationTime;
+    }
+
+    public long getTTL() {
+        return TTL;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    protected void setChannel(Channel channel) {
+        this.channel = channel;
+    }
+
+    protected void setClient(Client client) {
+        this.client = client;
+    }
+
+    public void setTTL(long TTL) {
+        this.TTL = TTL;
+    }
 }
\ No newline at end of file

Propchange: tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/MessageImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/RequestBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/RequestBase.java?rev=723775&r1=723774&r2=723775&view=diff
==============================================================================
--- tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/RequestBase.java (original)
+++ tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/RequestBase.java Fri Dec  5 07:57:43 2008
@@ -1,259 +1,259 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tomcat.bayeux;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.Date;
-import java.text.SimpleDateFormat;
-import javax.servlet.ServletException;
-
-import org.apache.catalina.CometEvent;
-import org.apache.tomcat.bayeux.HttpError;
-
-import org.apache.juli.logging.Log;
-import org.apache.juli.logging.LogFactory;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.apache.cometd.bayeux.Bayeux;
-import org.apache.cometd.bayeux.Message;
-
-/**
- * Common functionality and member variables for all Bayeux requests.
- *
- * @author Guy A. Molinari
- * @author Filip Hanik
- * @version 0.9
- *
- */
-public abstract class RequestBase implements BayeuxRequest {
-    
-    protected static final SimpleDateFormat timestampFmt =
-        new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
-    static {
-        timestampFmt.setTimeZone(TimeZone.getTimeZone("GMT"));
-    }
-    //message properties, combined for all messages
-    protected TomcatBayeux tomcatBayeux;
-    protected String channel;
-    protected String id;
-    protected String clientId;
-    protected String version = null;
-    protected String[] suppConnTypes = null;
-    protected int suppConnTypesFlag = 0;
-    protected int desiredConnTypeFlag = 0;
-    protected String minVersion = null;
-    protected String subscription = null;
-    protected String data = null;
-    protected String conType = null;
-    protected LinkedHashMap<String, Object> ext = new LinkedHashMap<String, Object> ();
-
-    
-    protected CometEvent event;
-    
-    protected HashMap<String, Object> response = null;
-    
-    protected static Log log = LogFactory.getLog(RequestBase.class);
-    
-    protected int reconnectInterval = 1000;
-    
-    protected RequestBase(TomcatBayeux tb, CometEvent event, JSONObject jsReq) throws JSONException {
-        this.tomcatBayeux = tb;
-        this.event = event;
-        channel = jsReq.optString(Bayeux.CHANNEL_FIELD);
-        id = jsReq.optString(Bayeux.ID_FIELD);
-        clientId = jsReq.optString(Bayeux.CLIENT_FIELD);
-        version = jsReq.optString(Bayeux.VERSION_FIELD);
-        minVersion = jsReq.optString(Bayeux.MIN_VERSION_FIELD);
-        conType = jsReq.optString(Bayeux.CONNECTION_TYPE_FIELD);
-        subscription = jsReq.optString(Bayeux.SUBSCRIPTION_FIELD);
-        data = jsReq.optString(Bayeux.DATA_FIELD);
-        reconnectInterval = tb.getReconnectInterval();
-        if (jsReq.has(Bayeux.EXT_FIELD)) {
-            JSONObject jext = jsReq.getJSONObject(Bayeux.EXT_FIELD);
-            for (Iterator<String> i = jext.keys(); i.hasNext(); ) {
-                String key = i.next();
-                ext.put(key, jext.get(key));
-            }//for
-        }//end if
-        
-        if (jsReq.has(Bayeux.SUPP_CONNECTION_TYPE_FIELD)) {
-            JSONArray types = jsReq.getJSONArray(Bayeux.SUPP_CONNECTION_TYPE_FIELD);
-            suppConnTypes = new String[types.length()];
-            for (int i = 0; i < types.length(); i++) {
-                suppConnTypes[i] = types.getString(i);
-                if (Bayeux.TRANSPORT_CALLBACK_POLL.equals(suppConnTypes[i]))
-                    suppConnTypesFlag = suppConnTypesFlag|ClientImpl.SUPPORT_CALLBACK_POLL;
-                else if (Bayeux.TRANSPORT_LONG_POLL.equals(suppConnTypes[i]))
-                    suppConnTypesFlag = suppConnTypesFlag|ClientImpl.SUPPORT_LONG_POLL;
-            }//for
-        }//end if
-
-        if (conType!=null) {
-            if (Bayeux.TRANSPORT_CALLBACK_POLL.equals(conType))
-                desiredConnTypeFlag = ClientImpl.SUPPORT_CALLBACK_POLL;
-            else if (Bayeux.TRANSPORT_LONG_POLL.equals(conType))
-                desiredConnTypeFlag = ClientImpl.SUPPORT_LONG_POLL;
-        }//end if
-        
-        //due to the fact that the javascript doesn't send up a required field
-        //we have to fake it
-        suppConnTypesFlag = ClientImpl.SUPPORT_CALLBACK_POLL | ClientImpl.SUPPORT_LONG_POLL;
-
-    }
-
-    public HttpError validate() {
-        HttpError result = null;
-//        if (clientId == null) {
-//            result = new HttpError(401,"No Client ID.", null);
-//        }
-        return result;
-    }
-
-    public TomcatBayeux getTomcatBayeux() {
-        return tomcatBayeux;
-    }
-
-    public String getChannel() {
-        return channel;
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public String getClientId() {
-        return clientId;
-    }
-
-    public LinkedHashMap getExt() {
-        return ext;
-    }
-
-    public CometEvent getEvent() {
-        return event;
-    }
-    
-    protected static void deliver(CometEvent event, ClientImpl to) throws IOException, ServletException, BayeuxException {
-        JSONArray jarray = getJSONArray(event,true);
-        if ( jarray == null ) throw new BayeuxException("No message to send!");
-        String jsonstring = jarray.toString();
-        if (log.isDebugEnabled()) {
-            log.debug("["+Thread.currentThread().getName()+"] Delivering message to[" + to + "] message:" + jsonstring);
-        }
-
-        if (to!=null) {
-            if (to.useJsonFiltered()) {
-                if (!event.getHttpServletResponse().isCommitted()) event.getHttpServletResponse().setContentType("text/json-comment-filtered");
-            }else {	
-                if (!event.getHttpServletResponse().isCommitted()) event.getHttpServletResponse().setContentType("text/json");
-            }
-        }
-
-        PrintWriter out = event.getHttpServletResponse().getWriter();
-        if (to==null) {
-            //do nothing
-        }else if ( (to.getDesirectConnType() == 0 && to.supportsLongPoll()) || to.getDesirectConnType() == ClientImpl.SUPPORT_LONG_POLL) {
-            if (to.useJsonFiltered())
-                out.print("/*");
-        } else if ( (to.getDesirectConnType() == 0 && to.supportsCallbackPoll()) || to.getDesirectConnType() == ClientImpl.SUPPORT_CALLBACK_POLL) {
-            String jsonp = event.getHttpServletRequest().getParameter(Bayeux.JSONP_PARAMETER);
-            if (jsonp == null)
-                jsonp = Bayeux.JSONP_DEFAULT_NAME;
-            out.print(jsonp);
-            out.print('(');
-        } else {
-            throw new BayeuxException("Client doesn't support any appropriate connection type.");
-        }
-        out.print(jsonstring);
-        if ( to == null ) {
-            //do nothing
-        } else if ( (to.getDesirectConnType() == 0 && to.supportsLongPoll()) || to.getDesirectConnType() == ClientImpl.SUPPORT_LONG_POLL) {
-            if (to.useJsonFiltered())
-                out.print("*/");
-        } else if ( (to.getDesirectConnType() == 0 && to.supportsCallbackPoll()) || to.getDesirectConnType() == ClientImpl.SUPPORT_CALLBACK_POLL) {
-            out.print(");");
-        } 
-        out.flush();
-        event.getHttpServletResponse().flushBuffer();
-
-        
-    }
-
-    protected static JSONArray getJSONArray(CometEvent event, boolean nullok) {
-        synchronized(event) {
-            JSONArray jarray = (JSONArray) event.getHttpServletRequest().getAttribute(JSON_MSG_ARRAY);
-            if (jarray == null && (!nullok)) {
-                jarray = new JSONArray();
-                event.getHttpServletRequest().setAttribute(JSON_MSG_ARRAY, jarray);
-            }
-            return jarray;
-        }
-    }
-
-    protected JSONArray getJSONArray() {
-        return getJSONArray(event,false);
-    }
-
-    protected void addToDeliveryQueue(ClientImpl to, JSONObject msg) throws IOException, ServletException, BayeuxException {
-        synchronized (event) {
-            getJSONArray().put(msg);
-        }
-    }
-    
-    protected void flushMessages(ClientImpl client) throws BayeuxException {
-        List<Message> msgs = client.takeMessages();
-        synchronized (event) {
-            try {
-                for (Iterator<Message> it = msgs.iterator(); it.hasNext(); ){
-                    MessageImpl msg = (MessageImpl)it.next();
-                    Map map = new HashMap();
-                    map.put(Bayeux.CHANNEL_FIELD,msg.getChannel().getId());
-                    if (msg.getClient()!=null) map.put(Bayeux.CLIENT_FIELD,msg.getClient().getId());
-                    map.put(Bayeux.DATA_FIELD,msg);
-                    JSONObject obj = new JSONObject(map);
-                    addToDeliveryQueue(client, obj);
-                }
-            } catch (ServletException x) {
-                throw new BayeuxException(x);
-            } catch (IOException x) {
-                throw new BayeuxException(x);
-            }
-        }
-    }
-    
-    public int process(int prevops) throws BayeuxException {
-        event.getHttpServletRequest().setAttribute(CURRENT_REQ_ATTR,this);
-        return prevops;
-    }
-    
-    public int getReconnectInterval() {
-        return reconnectInterval;
-    }
-
-    public String getTimeStamp() {
-        return timestampFmt.format(new Date(System.currentTimeMillis()));
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.bayeux;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.Date;
+import java.text.SimpleDateFormat;
+import javax.servlet.ServletException;
+
+import org.apache.catalina.CometEvent;
+import org.apache.tomcat.bayeux.HttpError;
+
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.apache.cometd.bayeux.Bayeux;
+import org.apache.cometd.bayeux.Message;
+
+/**
+ * Common functionality and member variables for all Bayeux requests.
+ *
+ * @author Guy A. Molinari
+ * @author Filip Hanik
+ * @version 0.9
+ *
+ */
+public abstract class RequestBase implements BayeuxRequest {
+    
+    protected static final SimpleDateFormat timestampFmt =
+        new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+    static {
+        timestampFmt.setTimeZone(TimeZone.getTimeZone("GMT"));
+    }
+    //message properties, combined for all messages
+    protected TomcatBayeux tomcatBayeux;
+    protected String channel;
+    protected String id;
+    protected String clientId;
+    protected String version = null;
+    protected String[] suppConnTypes = null;
+    protected int suppConnTypesFlag = 0;
+    protected int desiredConnTypeFlag = 0;
+    protected String minVersion = null;
+    protected String subscription = null;
+    protected String data = null;
+    protected String conType = null;
+    protected LinkedHashMap<String, Object> ext = new LinkedHashMap<String, Object> ();
+
+    
+    protected CometEvent event;
+    
+    protected HashMap<String, Object> response = null;
+    
+    protected static Log log = LogFactory.getLog(RequestBase.class);
+    
+    protected int reconnectInterval = 1000;
+    
+    protected RequestBase(TomcatBayeux tb, CometEvent event, JSONObject jsReq) throws JSONException {
+        this.tomcatBayeux = tb;
+        this.event = event;
+        channel = jsReq.optString(Bayeux.CHANNEL_FIELD);
+        id = jsReq.optString(Bayeux.ID_FIELD);
+        clientId = jsReq.optString(Bayeux.CLIENT_FIELD);
+        version = jsReq.optString(Bayeux.VERSION_FIELD);
+        minVersion = jsReq.optString(Bayeux.MIN_VERSION_FIELD);
+        conType = jsReq.optString(Bayeux.CONNECTION_TYPE_FIELD);
+        subscription = jsReq.optString(Bayeux.SUBSCRIPTION_FIELD);
+        data = jsReq.optString(Bayeux.DATA_FIELD);
+        reconnectInterval = tb.getReconnectInterval();
+        if (jsReq.has(Bayeux.EXT_FIELD)) {
+            JSONObject jext = jsReq.getJSONObject(Bayeux.EXT_FIELD);
+            for (Iterator<String> i = jext.keys(); i.hasNext(); ) {
+                String key = i.next();
+                ext.put(key, jext.get(key));
+            }//for
+        }//end if
+        
+        if (jsReq.has(Bayeux.SUPP_CONNECTION_TYPE_FIELD)) {
+            JSONArray types = jsReq.getJSONArray(Bayeux.SUPP_CONNECTION_TYPE_FIELD);
+            suppConnTypes = new String[types.length()];
+            for (int i = 0; i < types.length(); i++) {
+                suppConnTypes[i] = types.getString(i);
+                if (Bayeux.TRANSPORT_CALLBACK_POLL.equals(suppConnTypes[i]))
+                    suppConnTypesFlag = suppConnTypesFlag|ClientImpl.SUPPORT_CALLBACK_POLL;
+                else if (Bayeux.TRANSPORT_LONG_POLL.equals(suppConnTypes[i]))
+                    suppConnTypesFlag = suppConnTypesFlag|ClientImpl.SUPPORT_LONG_POLL;
+            }//for
+        }//end if
+
+        if (conType!=null) {
+            if (Bayeux.TRANSPORT_CALLBACK_POLL.equals(conType))
+                desiredConnTypeFlag = ClientImpl.SUPPORT_CALLBACK_POLL;
+            else if (Bayeux.TRANSPORT_LONG_POLL.equals(conType))
+                desiredConnTypeFlag = ClientImpl.SUPPORT_LONG_POLL;
+        }//end if
+        
+        //due to the fact that the javascript doesn't send up a required field
+        //we have to fake it
+        suppConnTypesFlag = ClientImpl.SUPPORT_CALLBACK_POLL | ClientImpl.SUPPORT_LONG_POLL;
+
+    }
+
+    public HttpError validate() {
+        HttpError result = null;
+//        if (clientId == null) {
+//            result = new HttpError(401,"No Client ID.", null);
+//        }
+        return result;
+    }
+
+    public TomcatBayeux getTomcatBayeux() {
+        return tomcatBayeux;
+    }
+
+    public String getChannel() {
+        return channel;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public LinkedHashMap getExt() {
+        return ext;
+    }
+
+    public CometEvent getEvent() {
+        return event;
+    }
+    
+    protected static void deliver(CometEvent event, ClientImpl to) throws IOException, ServletException, BayeuxException {
+        JSONArray jarray = getJSONArray(event,true);
+        if ( jarray == null ) throw new BayeuxException("No message to send!");
+        String jsonstring = jarray.toString();
+        if (log.isDebugEnabled()) {
+            log.debug("["+Thread.currentThread().getName()+"] Delivering message to[" + to + "] message:" + jsonstring);
+        }
+
+        if (to!=null) {
+            if (to.useJsonFiltered()) {
+                if (!event.getHttpServletResponse().isCommitted()) event.getHttpServletResponse().setContentType("text/json-comment-filtered");
+            }else {	
+                if (!event.getHttpServletResponse().isCommitted()) event.getHttpServletResponse().setContentType("text/json");
+            }
+        }
+
+        PrintWriter out = event.getHttpServletResponse().getWriter();
+        if (to==null) {
+            //do nothing
+        }else if ( (to.getDesirectConnType() == 0 && to.supportsLongPoll()) || to.getDesirectConnType() == ClientImpl.SUPPORT_LONG_POLL) {
+            if (to.useJsonFiltered())
+                out.print("/*");
+        } else if ( (to.getDesirectConnType() == 0 && to.supportsCallbackPoll()) || to.getDesirectConnType() == ClientImpl.SUPPORT_CALLBACK_POLL) {
+            String jsonp = event.getHttpServletRequest().getParameter(Bayeux.JSONP_PARAMETER);
+            if (jsonp == null)
+                jsonp = Bayeux.JSONP_DEFAULT_NAME;
+            out.print(jsonp);
+            out.print('(');
+        } else {
+            throw new BayeuxException("Client doesn't support any appropriate connection type.");
+        }
+        out.print(jsonstring);
+        if ( to == null ) {
+            //do nothing
+        } else if ( (to.getDesirectConnType() == 0 && to.supportsLongPoll()) || to.getDesirectConnType() == ClientImpl.SUPPORT_LONG_POLL) {
+            if (to.useJsonFiltered())
+                out.print("*/");
+        } else if ( (to.getDesirectConnType() == 0 && to.supportsCallbackPoll()) || to.getDesirectConnType() == ClientImpl.SUPPORT_CALLBACK_POLL) {
+            out.print(");");
+        } 
+        out.flush();
+        event.getHttpServletResponse().flushBuffer();
+
+        
+    }
+
+    protected static JSONArray getJSONArray(CometEvent event, boolean nullok) {
+        synchronized(event) {
+            JSONArray jarray = (JSONArray) event.getHttpServletRequest().getAttribute(JSON_MSG_ARRAY);
+            if (jarray == null && (!nullok)) {
+                jarray = new JSONArray();
+                event.getHttpServletRequest().setAttribute(JSON_MSG_ARRAY, jarray);
+            }
+            return jarray;
+        }
+    }
+
+    protected JSONArray getJSONArray() {
+        return getJSONArray(event,false);
+    }
+
+    protected void addToDeliveryQueue(ClientImpl to, JSONObject msg) throws IOException, ServletException, BayeuxException {
+        synchronized (event) {
+            getJSONArray().put(msg);
+        }
+    }
+    
+    protected void flushMessages(ClientImpl client) throws BayeuxException {
+        List<Message> msgs = client.takeMessages();
+        synchronized (event) {
+            try {
+                for (Iterator<Message> it = msgs.iterator(); it.hasNext(); ){
+                    MessageImpl msg = (MessageImpl)it.next();
+                    Map map = new HashMap();
+                    map.put(Bayeux.CHANNEL_FIELD,msg.getChannel().getId());
+                    if (msg.getClient()!=null) map.put(Bayeux.CLIENT_FIELD,msg.getClient().getId());
+                    map.put(Bayeux.DATA_FIELD,msg);
+                    JSONObject obj = new JSONObject(map);
+                    addToDeliveryQueue(client, obj);
+                }
+            } catch (ServletException x) {
+                throw new BayeuxException(x);
+            } catch (IOException x) {
+                throw new BayeuxException(x);
+            }
+        }
+    }
+    
+    public int process(int prevops) throws BayeuxException {
+        event.getHttpServletRequest().setAttribute(CURRENT_REQ_ATTR,this);
+        return prevops;
+    }
+    
+    public int getReconnectInterval() {
+        return reconnectInterval;
+    }
+
+    public String getTimeStamp() {
+        return timestampFmt.format(new Date(System.currentTimeMillis()));
+    }
+
+}

Propchange: tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/RequestBase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/RequestFactory.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/RequestFactory.java?rev=723775&r1=723774&r2=723775&view=diff
==============================================================================
--- tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/RequestFactory.java (original)
+++ tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/RequestFactory.java Fri Dec  5 07:57:43 2008
@@ -1,48 +1,48 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tomcat.bayeux;
-
-import org.json.JSONObject;
-import org.apache.tomcat.bayeux.request.MetaHandshakeRequest;
-import org.apache.catalina.CometEvent;
-import org.json.JSONException;
-import org.apache.tomcat.bayeux.request.MetaConnectRequest;
-import org.apache.tomcat.bayeux.request.MetaDisconnectRequest;
-import org.apache.tomcat.bayeux.request.MetaSubscribeRequest;
-import org.apache.tomcat.bayeux.request.MetaUnsubscribeRequest;
-import org.apache.tomcat.bayeux.request.PublishRequest;
-import org.apache.cometd.bayeux.Bayeux;
-
-public class RequestFactory {
-
-    public static BayeuxRequest getRequest(TomcatBayeux tomcatBayeux, CometEvent event, JSONObject msg) throws JSONException {
-        String channel = msg.optString(Bayeux.CHANNEL_FIELD);
-        if (Bayeux.META_HANDSHAKE.equals(channel)) {
-            return new MetaHandshakeRequest(tomcatBayeux,event,msg);
-        }else if (Bayeux.META_CONNECT.equals(channel)) {
-            return new MetaConnectRequest(tomcatBayeux,event,msg);
-        }else if (Bayeux.META_DISCONNECT.equals(channel)) {
-            return new MetaDisconnectRequest(tomcatBayeux,event,msg);
-        }else if (Bayeux.META_SUBSCRIBE.equals(channel)) {
-            return new MetaSubscribeRequest(tomcatBayeux,event,msg);
-        }else if (Bayeux.META_UNSUBSCRIBE.equals(channel)) {
-            return new MetaUnsubscribeRequest(tomcatBayeux,event,msg);
-        } else {
-            return new PublishRequest(tomcatBayeux,event,msg);
-        }
-    }
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.bayeux;
+
+import org.json.JSONObject;
+import org.apache.tomcat.bayeux.request.MetaHandshakeRequest;
+import org.apache.catalina.CometEvent;
+import org.json.JSONException;
+import org.apache.tomcat.bayeux.request.MetaConnectRequest;
+import org.apache.tomcat.bayeux.request.MetaDisconnectRequest;
+import org.apache.tomcat.bayeux.request.MetaSubscribeRequest;
+import org.apache.tomcat.bayeux.request.MetaUnsubscribeRequest;
+import org.apache.tomcat.bayeux.request.PublishRequest;
+import org.apache.cometd.bayeux.Bayeux;
+
+public class RequestFactory {
+
+    public static BayeuxRequest getRequest(TomcatBayeux tomcatBayeux, CometEvent event, JSONObject msg) throws JSONException {
+        String channel = msg.optString(Bayeux.CHANNEL_FIELD);
+        if (Bayeux.META_HANDSHAKE.equals(channel)) {
+            return new MetaHandshakeRequest(tomcatBayeux,event,msg);
+        }else if (Bayeux.META_CONNECT.equals(channel)) {
+            return new MetaConnectRequest(tomcatBayeux,event,msg);
+        }else if (Bayeux.META_DISCONNECT.equals(channel)) {
+            return new MetaDisconnectRequest(tomcatBayeux,event,msg);
+        }else if (Bayeux.META_SUBSCRIBE.equals(channel)) {
+            return new MetaSubscribeRequest(tomcatBayeux,event,msg);
+        }else if (Bayeux.META_UNSUBSCRIBE.equals(channel)) {
+            return new MetaUnsubscribeRequest(tomcatBayeux,event,msg);
+        } else {
+            return new PublishRequest(tomcatBayeux,event,msg);
+        }
+    }
 }
\ No newline at end of file

Propchange: tomcat/trunk/modules/bayeux/java/org/apache/tomcat/bayeux/RequestFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message