flex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cd...@apache.org
Subject [33/51] [partial] flex-blazeds git commit: - Major code scrub
Date Wed, 15 Feb 2017 22:43:14 GMT
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/MessageBrokerServlet.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/MessageBrokerServlet.java b/core/src/flex/messaging/MessageBrokerServlet.java
deleted file mode 100644
index 0e8efb0..0000000
--- a/core/src/flex/messaging/MessageBrokerServlet.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging;
-
-import flex.management.MBeanLifecycleManager;
-import flex.management.MBeanServerLocatorFactory;
-import flex.messaging.config.ConfigurationManager;
-import flex.messaging.config.FlexConfigurationManager;
-import flex.messaging.config.MessagingConfiguration;
-import flex.messaging.endpoints.Endpoint;
-import flex.messaging.io.SerializationContext;
-import flex.messaging.io.TypeMarshallingContext;
-import flex.messaging.log.HTTPRequestLog;
-import flex.messaging.log.Log;
-import flex.messaging.log.LogCategories;
-import flex.messaging.log.Logger;
-import flex.messaging.log.LoggingHttpServletRequestWrapper;
-import flex.messaging.log.ServletLogTarget;
-import flex.messaging.services.AuthenticationService;
-import flex.messaging.util.ClassUtil;
-import flex.messaging.util.ExceptionUtil;
-import flex.messaging.util.Trace;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.security.Principal;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * The MessageBrokerServlet bootstraps the MessageBroker,
- * adds endpoints to it, and starts the broker. The servlet
- * also acts as a facade for all http-based endpoints, in that
- * the servlet receives the http request and then delegates to
- * an endpoint that can handle the request's content type. This
- * does not occur for non-http endpoints, such as the rtmp endpoint.
- *
- * @see flex.messaging.MessageBroker
- *
- */
-public class MessageBrokerServlet extends HttpServlet
-{
-    static final long serialVersionUID = -5293855229461612246L;
-
-    public static final String LOG_CATEGORY_STARTUP_BROKER = LogCategories.STARTUP_MESSAGEBROKER;
-    private static final String STRING_UNDEFINED_APPLICATION = "undefined";
-
-    private MessageBroker broker;
-    private HttpFlexSessionProvider httpFlexSessionProvider;
-    private static String FLEXDIR = "/WEB-INF/flex/";
-    private boolean log_errors = false;
-
-    /**
-     * Initializes the servlet in its web container, then creates
-     * the MessageBroker and adds Endpoints and Services to that broker.
-     * This servlet may keep a reference to an endpoint if it needs to
-     * delegate to it in the <code>service</code> method.
-     */
-    public void init(ServletConfig servletConfig) throws ServletException
-    {
-        super.init(servletConfig);
-
-        // allocate thread local variables
-        createThreadLocals();
-
-        // Set the servlet config as thread local
-        FlexContext.setThreadLocalObjects(null, null, null, null, null, servletConfig);
-
-        ServletLogTarget.setServletContext(servletConfig.getServletContext());
-
-        ClassLoader loader = getClassLoader();
-
-        if ("true".equals(servletConfig.getInitParameter("useContextClassLoader")))
-        {
-            loader = Thread.currentThread().getContextClassLoader();
-        }
-
-        // Should we wrap http request for later error logging?
-        log_errors = HTTPRequestLog.init(getServletContext());
-
-        // Start the broker
-        try
-        {
-            // Get the configuration manager
-            ConfigurationManager configManager = loadMessagingConfiguration(servletConfig);
-
-            // Load configuration
-            MessagingConfiguration config = configManager.getMessagingConfiguration(servletConfig);
-
-            // Set up logging system ahead of everything else.
-            config.createLogAndTargets();
-
-            // Create broker.
-            broker = config.createBroker(servletConfig.getInitParameter("messageBrokerId"), loader);
-
-            // Set the servlet config as thread local
-            FlexContext.setThreadLocalObjects(null, null, broker, null, null, servletConfig);
-
-            setupPathResolvers();
-
-            // Set initial servlet context on broker
-            broker.setServletContext(servletConfig.getServletContext());
-
-            Logger logger = Log.getLogger(ConfigurationManager.LOG_CATEGORY);
-            if (Log.isInfo())
-            {
-                logger.info(VersionInfo.buildMessage());
-            }
-
-            // Create endpoints, services, security, and logger on the broker based on configuration
-            config.configureBroker(broker);
-
-            long timeBeforeStartup = 0;
-            if (Log.isDebug())
-            {
-                timeBeforeStartup = System.currentTimeMillis();
-                Log.getLogger(LOG_CATEGORY_STARTUP_BROKER).debug("MessageBroker with id '{0}' is starting.",
-                        new Object[]{broker.getId()});
-            }
-
-            //initialize the httpSessionToFlexSessionMap
-            synchronized(HttpFlexSession.mapLock)
-            {
-                if (servletConfig.getServletContext().getAttribute(HttpFlexSession.SESSION_MAP) == null)
-                    servletConfig.getServletContext().setAttribute(HttpFlexSession.SESSION_MAP, new ConcurrentHashMap());
-            }
-
-            broker.start();
-
-            if (Log.isDebug())
-            {
-                long timeAfterStartup = System.currentTimeMillis();
-                Long diffMillis = timeAfterStartup - timeBeforeStartup;
-                Log.getLogger(LOG_CATEGORY_STARTUP_BROKER).debug("MessageBroker with id '{0}' is ready (startup time: '{1}' ms)",
-                        new Object[]{broker.getId(), diffMillis});
-            }
-
-            // Report replaced tokens
-            configManager.reportTokens();
-
-            // Report any unused properties.
-            config.reportUnusedProperties();
-
-            // Setup provider for FlexSessions that wrap underlying J2EE HttpSessions.
-            httpFlexSessionProvider = new HttpFlexSessionProvider();
-            broker.getFlexSessionManager().registerFlexSessionProvider(HttpFlexSession.class, httpFlexSessionProvider);
-
-            // clear the broker and servlet config as this thread is done
-            FlexContext.clearThreadLocalObjects();
-        }
-        catch (Throwable t)
-        {
-            // On any unhandled exception destroy the broker, log it and rethrow.
-            String applicationName = servletConfig.getServletContext().getServletContextName();
-            if (applicationName == null)
-                applicationName = STRING_UNDEFINED_APPLICATION;
-
-            System.err.println("**** MessageBrokerServlet in application '" + applicationName
-                    + "' failed to initialize due to runtime exception: "
-                    + ExceptionUtil.exceptionFollowedByRootCausesToString(t));
-            destroy();
-            // We used to throw  UnavailableException, but Weblogic didn't mark the webapp as failed. See bug FBR-237
-            throw new ServletException(t);
-        }
-    }
-
-    private void setupPathResolvers()
-    {
-        setupExternalPathResolver();
-        setupInternalPathResolver();
-    }
-
-    private void setupExternalPathResolver()
-    {
-        broker.setExternalPathResolver(
-                new MessageBroker.PathResolver()
-                {
-                    public InputStream resolve(String filename) throws FileNotFoundException
-                    {
-                        return new FileInputStream(new File(filename));
-                    }
-                }
-        );
-    }
-
-    private void setupInternalPathResolver()
-    {
-        broker.setInternalPathResolver(
-                new MessageBroker.InternalPathResolver()
-                {
-                    public InputStream resolve(String filename)
-                    {
-                        return getServletContext().getResourceAsStream(FLEXDIR + filename);
-                    }
-                }
-        );
-    }
-
-    private static ConfigurationManager loadMessagingConfiguration(ServletConfig servletConfig)
-    {
-        ConfigurationManager manager = null;
-        Class managerClass;
-        String className;
-
-        // Check for Custom Configuration Manager Specification
-        if (servletConfig != null)
-        {
-            String p = servletConfig.getInitParameter("services.configuration.manager");
-            if (p != null)
-            {
-                className = p.trim();
-                try
-                {
-                    managerClass = ClassUtil.createClass(className);
-                    manager = (ConfigurationManager)managerClass.newInstance();
-                }
-                catch (Throwable t)
-                {
-                    if (Trace.config) // Log is not initialized yet.
-                        Trace.trace("Could not load configuration manager as: " + className);
-                }
-            }
-        }
-
-        if (manager == null)
-        {
-            manager = new FlexConfigurationManager();
-        }
-
-        return manager;
-    }
-
-    /**
-     * Stops all endpoints in the MessageBroker, giving them a chance
-     * to perform any endpoint-specific clean up.
-     */
-    public void destroy()
-    {
-        if (broker != null)
-        {
-            broker.stop();
-            if (broker.isManaged())
-            {
-                MBeanLifecycleManager.unregisterRuntimeMBeans(broker);
-            }
-            // release static thread locals
-            destroyThreadLocals();
-        }
-    }
-
-    /**
-     * Handle an incoming request, and delegate to an endpoint based on
-     * content type, if appropriate. The content type mappings for endpoints
-     * are not externally configurable, and currently the AmfEndpoint
-     * is the only delegate.
-     */
-    public void service(HttpServletRequest req, HttpServletResponse res)
-    {
-        if (log_errors)
-        {
-            // Create a wrapper for the request object so we can save the body content
-            LoggingHttpServletRequestWrapper wrapper = new LoggingHttpServletRequestWrapper(req);
-            req = wrapper;
-
-            try
-            {
-                // Read the body content
-                wrapper.doReadBody();
-            }
-            catch (IOException ignore)
-            {
-                // ignore, the wrapper will preserve what content we were able to read.
-            }
-        }
-
-        try
-        {
-            // Update thread locals
-            broker.initThreadLocals();
-            // Set this first so it is in place for the session creation event.  The
-            // current session is set by the FlexSession stuff right when it is available.
-            // The threadlocal FlexClient is set up during message deserialization in the
-            // MessageBrokerFilter.
-            FlexContext.setThreadLocalObjects(null, null, broker, req, res, getServletConfig());
-
-            HttpFlexSession fs = httpFlexSessionProvider.getOrCreateSession(req);
-            Principal principal;
-            if(FlexContext.isPerClientAuthentication())
-            {
-                principal = FlexContext.getUserPrincipal();
-            }
-            else
-            {
-                principal = fs.getUserPrincipal();
-            }
-
-            if (principal == null && req.getHeader("Authorization") != null)
-            {
-                String encoded = req.getHeader("Authorization");
-                if (encoded.indexOf("Basic") > -1)
-                {
-                    encoded = encoded.substring(6); //Basic.length()+1
-                    try
-                    {
-                        ((AuthenticationService)broker.getService(AuthenticationService.ID)).decodeAndLogin(encoded, broker.getLoginManager());
-                    }
-                    catch (Exception e)
-                    {
-                        if (Log.isDebug())
-                            Log.getLogger(LogCategories.SECURITY).info("Authentication service could not decode and login: " + e.getMessage());
-                    }
-                }
-            }
-
-            String contextPath = req.getContextPath();
-            String pathInfo = req.getPathInfo();
-            String endpointPath = req.getServletPath();
-            if (pathInfo != null)
-                endpointPath = endpointPath + pathInfo;
-
-            Endpoint endpoint;
-            try
-            {
-                endpoint = broker.getEndpoint(endpointPath, contextPath);
-            }
-            catch (MessageException me)
-            {
-                if (Log.isInfo())
-                    Log.getLogger(LogCategories.ENDPOINT_GENERAL).info("Received invalid request for endpoint path '{0}'.", new Object[] {endpointPath});
-
-                if (!res.isCommitted())
-                {
-                    try
-                    {
-                        res.sendError(HttpServletResponse.SC_NOT_FOUND);
-                    }
-                    catch (IOException ignore)
-                    {}
-                }
-
-                return;
-            }
-
-            try
-            {
-                if (Log.isInfo())
-                {
-                    Log.getLogger(LogCategories.ENDPOINT_GENERAL).info("Channel endpoint {0} received request.",
-                                                                       new Object[] {endpoint.getId()});
-                }
-                endpoint.service(req, res);
-            }
-            catch (UnsupportedOperationException ue)
-            {
-                if (Log.isInfo())
-                {
-                    Log.getLogger(LogCategories.ENDPOINT_GENERAL).info("Channel endpoint {0} received request for an unsupported operation.",
-                                                                       new Object[] {endpoint.getId()},
-                                                                       ue);
-                }
-
-                if (!res.isCommitted())
-                {
-                    try
-                    {
-                        res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
-                    }
-                    catch (IOException ignore)
-                    {}
-                }
-            }
-        }
-        catch (Throwable t)
-        {
-            // Final resort catch block as recommended by Fortify as a potential System info leak
-            try
-            {
-                Log.getLogger(LogCategories.ENDPOINT_GENERAL).error("Unexpected error encountered in Message Broker servlet", t);
-                res.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
-            }
-            catch (IOException ignore)
-            {
-                // ignore
-            }
-
-        }
-        finally
-        {
-            if (log_errors)
-            {
-                String info = (String) req.getAttribute(HTTPRequestLog.HTTP_ERROR_INFO);
-                if (info != null)
-                {
-                    // Log the HttpRequest data
-                    System.out.println("Exception occurred while processing HTTP request: " + info + ", request details logged in " + HTTPRequestLog.getFileName());
-                    HTTPRequestLog.outputRequest(info, req);
-                }
-            }
-
-            FlexContext.clearThreadLocalObjects();
-        }
-    }
-
-    /**
-     * Hook for subclasses to override the class loader to use for loading user defined classes.
-     *
-     * @return the class loader for this class
-     */
-    protected ClassLoader getClassLoader()
-    {
-        return this.getClass().getClassLoader();
-    }
-
-
-    // Call ONLY on servlet startup
-    public static void createThreadLocals()
-    {
-        // allocate static thread local objects
-        FlexContext.createThreadLocalObjects();
-        SerializationContext.createThreadLocalObjects();
-        TypeMarshallingContext.createThreadLocalObjects();
-    }
-
-
-    // Call ONLY on servlet shutdown
-    protected static void destroyThreadLocals()
-    {
-        // clear static member variables
-        Log.clear();
-        MBeanServerLocatorFactory.clear();
-
-        // Destroy static thread local objects
-        FlexContext.releaseThreadLocalObjects();
-        SerializationContext.releaseThreadLocalObjects();
-        TypeMarshallingContext.releaseThreadLocalObjects();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/MessageClient.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/MessageClient.java b/core/src/flex/messaging/MessageClient.java
deleted file mode 100644
index 56768e7..0000000
--- a/core/src/flex/messaging/MessageClient.java
+++ /dev/null
@@ -1,1148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import flex.messaging.client.FlexClient;
-import flex.messaging.client.FlexClientOutboundQueueProcessor;
-import flex.messaging.client.OutboundQueueThrottleManager;
-import flex.messaging.config.ThrottleSettings;
-import flex.messaging.config.ThrottleSettings.Policy;
-import flex.messaging.log.LogCategories;
-import flex.messaging.log.Log;
-import flex.messaging.messages.AsyncMessage;
-import flex.messaging.messages.CommandMessage;
-import flex.messaging.messages.Message;
-import flex.messaging.services.MessageService;
-import flex.messaging.services.messaging.Subtopic;
-import flex.messaging.services.messaging.selector.JMSSelector;
-import flex.messaging.services.messaging.selector.JMSSelectorException;
-import flex.messaging.util.ExceptionUtil;
-import flex.messaging.util.TimeoutAbstractObject;
-import flex.messaging.util.StringUtils;
-
-/**
- * Represents a client-side MessageAgent instance.
- * Currently a server-side MessageClient is only created if its client-side counterpart has subscribed
- * to a destination for pushed data (e.g. Consumer). Client-side Producers do not result in the creation of
- * corresponding server-side MessageClient instances.
- *
- * Client-side MessageAgents communicate with the server over a Channel that corresponds to a FlexSession.
- * Server-side MessageClient instances are always created in the context of a FlexSession and when the FlexSession
- * is invalidated any associated MessageClients are invalidated as well.
- *
- * MessageClients may also be timed out on a per-destination basis and this is based on subscription inactivity.
- * If no messages are pushed to the MessageClient within the destination's subscription timeout period the
- * MessageClient will be shutdown even if the associated FlexSession is still active and connected.
- * Per-destination subscription timeout is an optional configuration setting, and should only be used when inactive
- * subscriptions should be shut down opportunistically to preserve server resources.
- */
-public class MessageClient extends TimeoutAbstractObject implements Serializable
-{
-    //--------------------------------------------------------------------------
-    //
-    // Public Static Variables
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Log category for MessageClient related messages.
-     */
-    public static final String MESSAGE_CLIENT_LOG_CATEGORY = LogCategories.CLIENT_MESSAGECLIENT;
-
-    //--------------------------------------------------------------------------
-    //
-    // Static Constants
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Serializable to support broadcasting subscription state across the cluster for
-     * optimized message routing.
-     */
-    static final long serialVersionUID = 3730240451524954453L;
-
-    //--------------------------------------------------------------------------
-    //
-    // Static Variables
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * The list of MessageClient created listeners.
-     */
-    private static final CopyOnWriteArrayList<MessageClientListener> createdListeners = new CopyOnWriteArrayList<MessageClientListener>();
-
-    //--------------------------------------------------------------------------
-    //
-    // Static Methods
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Adds a MessageClient created listener.
-     *
-     * @see flex.messaging.MessageClientListener
-     *
-     * @param listener The listener to add.
-     */
-    public static void addMessageClientCreatedListener(MessageClientListener listener)
-    {
-        if (listener != null)
-            createdListeners.addIfAbsent(listener);
-    }
-
-    /**
-     * Removes a MessageClient created listener.
-     *
-     * @see flex.messaging.MessageClientListener
-     *
-     * @param listener The listener to remove.
-     */
-    public static void removeMessageClientCreatedListener(MessageClientListener listener)
-    {
-        if (listener != null)
-            createdListeners.remove(listener);
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Private Static Methods
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Utility method.
-     */
-    private static boolean equalStrings(String a, String b)
-    {
-        return a == b || (a != null && a.equals(b));
-    }
-
-    /**
-     * Utility method.
-     */
-    static int compareStrings(String a, String b)
-    {
-        if (a == b)
-            return 0;
-
-        if (a != null && b != null)
-            return a.compareTo(b);
-
-        if (a == null)
-            return -1;
-
-        return 1;
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Constructor
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     *
-     * Constructs a new MessageClient for local use.
-     *
-     * @param clientId The clientId for the MessageClient.
-     * @param destination The destination the MessageClient is subscribed to.
-     * @param endpointId The Id of the endpoint this MessageClient subscription was created over.
-     */
-    public MessageClient(Object clientId, Destination destination, String endpointId)
-    {
-        this(clientId, destination, endpointId, true);
-    }
-
-    /**
-     *
-     * Constructs a new MessageClient.
-     *
-     * @param clientId The clientId for the MessageClient.
-     * @param destination The destination the MessageClient is subscribed to.
-     * @param endpointId The Id of the endpoint this MessageClient subscription was created over.
-     * @param useSession RemoteMessageClient instances should not be associated with a FlexSession (pass false).
-     */
-    public MessageClient(Object clientId, Destination destination, String endpointId, boolean useSession)
-    {
-        valid = true;
-        this.clientId = clientId;
-        this.destination = destination;
-        this.endpointId = endpointId;
-        destinationId = destination.getId();
-        updateLastUse(); // Initialize last use timestamp to construct time.
-
-        /* If this is for a remote server, we do not associate with the session. */
-        if (useSession)
-        {
-            flexSession = FlexContext.getFlexSession();
-            flexSession.registerMessageClient(this);
-
-            flexClient = FlexContext.getFlexClient();
-            flexClient.registerMessageClient(this);
-
-            // SubscriptionManager will notify the created listeners, once
-            // subscription state is setup completely.
-            // notifyCreatedListeners();
-        }
-        else
-        {
-            flexClient = null;
-            flexSession = null;
-            // Use an instance level lock.
-            lock = new Object();
-            // On a remote server we don't notify created listeners.
-        }
-
-        if (Log.isDebug())
-            Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient created with clientId '" + this.clientId + "' for destination '" + destinationId + "'.");
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Variables
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     *  This flag is set to true when the client channel that this subscription was
-     *  established over is disconnected.
-     *  It supports cleaning up per-endpoint outbound queues maintained by the FlexClient.
-     *  If the client notifies the server that its channel is disconnecting, the FlexClient
-     *  does not need to maintain an outbound queue containing a subscription invalidation
-     *  message for this MessageClient to send to the client.
-     */
-    private volatile boolean clientChannelDisconnected;
-
-    /**
-     *  The clientId for the MessageClient.
-     *  This value is specified by the client directly or is autogenerated on the client.
-     */
-    protected final Object clientId;
-
-    /**
-     * Internal reference to the associated Destination; don't expose this in the public API.
-     */
-    protected final Destination destination;
-
-    /**
-     *  The destination the MessageClient is subscribed to.
-     */
-    protected final String destinationId;
-
-    /**
-     * The set of session destroy listeners to notify when the session is destroyed.
-     */
-    private transient volatile CopyOnWriteArrayList destroyedListeners;
-
-    /**
-     * The Id for the endpoint this MessageClient subscription was created over.
-     */
-    private String endpointId;
-
-    /**
-     * The FlexClient associated with the MessageClient.
-     */
-    private final transient FlexClient flexClient;
-
-    /**
-     * The FlexSession associated with the MessageClient.
-     * Not final because this needs to be reset if the subscription fails over to a new endpoint.
-     */
-    private transient FlexSession flexSession;
-
-    /**
-     * Flag used to break cycles during invalidation.
-     */
-    private boolean invalidating;
-
-    /**
-     * The lock to use to guard all state changes for the MessageClient.
-     */
-    protected Object lock = new Object();
-
-    /**
-     * Flag indicating whether the MessageClient is attempting to notify the remote client of
-     * its invalidation.
-     */
-    private volatile boolean attemptingInvalidationClientNotification;
-
-    /**
-     * A counter used to control invalidation for a MessageClient that has multiple
-     * subscriptions to its destination.
-     * Unsubscribing from one will not invalidate the MessageClient as long as other
-     * subscriptions remain active.
-     */
-    private transient int numReferences;
-
-    /**
-     * A set of all of the subscriptions managed by this message client.
-     */
-    protected final Set<SubscriptionInfo> subscriptions = new CopyOnWriteArraySet<SubscriptionInfo>();
-
-    /**
-     * Flag indicating whether this client is valid or not.
-     */
-    protected boolean valid;
-
-    /**
-     * Flag that indicates whether the MessageClient has a per-destination subscription timeout.
-     * If false, the MessageClient will remain valid until its associated FlexSession is invalidated.
-     */
-    private volatile boolean willTimeout;
-
-    /**
-     * Has anyone explicitly registered this message client.  This indicates that
-     * there is a reference to this MessageClient which is not an explicit subscription.
-     * This is a hook for FDMS and other adapters which want to use pushMessageToClients
-     * with clientIds but that do not want the subscription manager to manage subscriptions
-     * for them.
-     */
-    private volatile boolean registered = false;
-
-    //--------------------------------------------------------------------------
-    //
-    // Public Methods
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Returns the clientId for the MessageClient.
-     *
-     * @return The clientId for the MessageClient.
-     */
-    public Object getClientId()
-    {
-        return clientId; // Field is final; no need to sync.
-    }
-
-    /**
-     * Returns the destination the MessageClient is subscribed to.
-     *
-     * @return The destination the MessageClient is subscribed to.
-     */
-    public Destination getDestination()
-    {
-        return destination; // Field is final; no need to sync.
-    }
-
-    /**
-     * Returns the id of the destination the MessageClient is subscribed to.
-     *
-     * @return The id of the destination the MessageClient is subscribed to.
-     */
-    public String getDestinationId()
-    {
-        return destinationId; // Field is final; no need to sync.
-    }
-
-    /**
-     * Returns the Id for the endpoint the MessageClient subscription was created over.
-     *
-     * @return The Id for the endpoint the MessageClient subscription was created over.
-     */
-    public String getEndpointId()
-    {
-        return endpointId; // Field is final; no need to sync.
-    }
-
-    /**
-     * Returns the FlexClient associated with this MessageClient.
-     *
-     * @return The FlexClient assocaited with this MessageClient.
-     */
-    public FlexClient getFlexClient()
-    {
-        return flexClient; // Field is final; no need to sync.
-    }
-
-    /**
-     * Returns the FlexSession associated with this MessageClient.
-     *
-     * @return The FlexSession associated with this MessageClient.
-     */
-    public FlexSession getFlexSession()
-    {
-        synchronized (lock)
-        {
-            return flexSession;
-        }
-    }
-
-    /**
-     * Returns the number of subscriptions associated with this MessageClient.
-     *
-     * @return The number of subscriptions associated with this MessageClient.
-     */
-    public int getSubscriptionCount()
-    {
-        int count;
-
-        synchronized (lock)
-        {
-            count = subscriptions != null? subscriptions.size() : 0;
-        }
-
-        return count;
-    }
-
-    /**
-     *
-     * This is used for FlexClient outbound queue management. When a MessageClient is invalidated
-     * if it is attempting to notify the client, then we must leave the outbound queue containing
-     * the notification in place. Otherwise, any messages queued for the subscription may be
-     * removed from the queue and possibly shut down immediately.
-     *
-     * @return true if the MessageClient is currently trying to notify the client about it's invalidation.
-     */
-    public boolean isAttemptingInvalidationClientNotification()
-    {
-        return attemptingInvalidationClientNotification;
-    }
-
-    /**
-     *
-     * This is set to true when the MessageClient is invalidated due to the client
-     * channel the subscription was established over disconnecting.
-     * It allows the FlexClient class to cleanup the outbound queue for the channel's
-     * corresponding server endpoint for the remote client, because we know that no
-     * currently queued messages need to be retained for delivery.
-     *
-     * @param value true if the MessageClient is invalidated due to the client being disconnected
-     */
-    public void setClientChannelDisconnected(boolean value)
-    {
-        clientChannelDisconnected = value;
-    }
-
-    /**
-     * @return true if the MessageClient is invalidated due to the client being disconnected
-     */
-    public boolean isClientChannelDisconnected()
-    {
-        return clientChannelDisconnected;
-    }
-
-    /**
-     *
-     * This is true when some code other than the SubscriptionManager
-     * is maintaining subscriptions for this message client.  It ensures
-     * that we have this MessageClient kept around until the session
-     * expires.
-     */
-    public void setRegistered(boolean reg)
-    {
-        registered = reg;
-    }
-
-    /**
-     *
-     */
-    public boolean isRegistered()
-    {
-        return registered;
-    }
-
-    /**
-     * Adds a MessageClient destroy listener.
-     *
-     * @see flex.messaging.MessageClientListener
-     *
-     * @param listener The listener to add.
-     */
-    public void addMessageClientDestroyedListener(MessageClientListener listener)
-    {
-        if (listener != null)
-        {
-            checkValid();
-
-            if (destroyedListeners == null)
-            {
-                synchronized (lock)
-                {
-                    if (destroyedListeners == null)
-                        destroyedListeners = new CopyOnWriteArrayList();
-                }
-            }
-
-            destroyedListeners.addIfAbsent(listener);
-        }
-    }
-
-    /**
-     * Removes a MessageClient destroyed listener.
-     *
-     * @see flex.messaging.MessageClientListener
-     *
-     * @param listener The listener to remove.
-     */
-    public void removeMessageClientDestroyedListener(MessageClientListener listener)
-    {
-        // No need to check validity; removing a listener is always ok.
-        if (listener != null && destroyedListeners != null)
-            destroyedListeners.remove(listener);
-    }
-
-    /**
-     *
-     * Adds a subscription to the subscription set for this MessageClient.
-     *
-     * @param selector The selector expression used for the subscription.
-     * @param subtopic The subtopic used for the subscription.
-     * @param maxFrequency The maximum number of messages the client wants to
-     * receive per second (0 disables this limit).
-     */
-    public void addSubscription(String selector, String subtopic, int maxFrequency)
-    {
-        synchronized (lock)
-        {
-            checkValid();
-
-            incrementReferences();
-
-            // Create and add the subscription to the subscriptions set.
-            SubscriptionInfo si = new SubscriptionInfo(selector, subtopic, maxFrequency);
-            subscriptions.add(si);
-
-            registerSubscriptionWithThrottleManager(si);
-        }
-    }
-
-    /**
-     *
-     * Registers  the subscription with the outbound queue processor's throttle
-     * manager, if one exists.
-     *
-     * @param si The subscription info object.
-     */
-    public void registerSubscriptionWithThrottleManager(SubscriptionInfo si)
-    {
-        // Register the destination that will setup client level outbound throttling.
-        ThrottleSettings ts = destination.getNetworkSettings().getThrottleSettings();
-        if (ts.getOutboundPolicy() != Policy.NONE && (ts.isOutboundClientThrottleEnabled() || si.maxFrequency > 0))
-        {
-            // Setup the client level outbound throttling, and register the destination
-            // only if the policy is not NONE, and a throttling limit is specified
-            // either at the destination or by consumer.
-            OutboundQueueThrottleManager throttleManager = getThrottleManager(true);
-            if (throttleManager != null)
-                throttleManager.registerDestination(destinationId, ts.getOutgoingClientFrequency(), ts.getOutboundPolicy());
-        }
-        else if (si.maxFrequency > 0) // Let the client know that maxFrequency will be ignored.
-        {
-            if (Log.isWarn())
-                Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).warn("MessageClient with clientId '"
-                        + clientId + "' for destination '" + destinationId
-                        + "' specified a maxFrequency value of '" + si.maxFrequency
-                        + "' but the destination does not define a throttling policy. This value will be ignored.");
-        }
-
-        // Now, register the subscription.
-        OutboundQueueThrottleManager throttleManager = getThrottleManager(false);
-        if (throttleManager != null)
-            throttleManager.registerSubscription(destinationId, si);
-    }
-
-    /**
-     *
-     * Removes a subscription from the subscription set for this MessageClient.
-     *
-     * @param selector The selector expression for the subscription.
-     * @param subtopic The subtopic for the subscription.
-     * @return true if no subscriptions remain for this MessageClient; otherwise false.
-     */
-    public boolean removeSubscription(String selector, String subtopic)
-    {
-        synchronized (lock)
-        {
-            SubscriptionInfo si = new SubscriptionInfo(selector, subtopic);
-            if (subscriptions.remove(si))
-            {
-                unregisterSubscriptionWithThrottleManager(si);
-                return decrementReferences();
-            }
-            else if (Log.isError())
-            {
-                Log.getLogger(MessageService.LOG_CATEGORY).error("Error - unable to find subscription to remove for MessageClient: "
-                        + clientId + " selector: " + selector + " subtopic: " + subtopic);
-            }
-            return numReferences == 0;
-        }
-    }
-
-    /**
-     *
-     * We use the same MessageClient for more than one subscription with different
-     * selection criteria.  This tracks the number of subscriptions that are active
-     * so that we know when we are finished.
-     */
-    public void incrementReferences()
-    {
-        synchronized (lock)
-        {
-            numReferences++;
-        }
-    }
-
-    /**
-     *
-     * Decrements the numReferences variable and returns true if this was the last reference.
-     */
-    public boolean decrementReferences()
-    {
-        synchronized (lock)
-        {
-            if (--numReferences == 0)
-            {
-                cancelTimeout();
-                if (destination instanceof MessageDestination)
-                {
-                    MessageDestination msgDestination = (MessageDestination)destination;
-                    if (msgDestination.getThrottleManager() != null)
-                        msgDestination.getThrottleManager().removeClientThrottleMark(clientId);
-                }
-                return true;
-            }
-            return false;
-        }
-    }
-
-    /**
-     *
-     * Invoked by SubscriptionManager once the subscription state is setup completely
-     * for the MessageClient..
-     */
-    public void notifyCreatedListeners()
-    {
-        // Notify MessageClient created listeners.
-        if (!createdListeners.isEmpty())
-        {
-            // CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions.
-            for (Iterator iter = createdListeners.iterator(); iter.hasNext();)
-                ((MessageClientListener)iter.next()).messageClientCreated(this);
-        }
-    }
-
-    /**
-     *
-     * Invoked by SubscriptionManager while handling a subscribe request.
-     * If the request is updating an existing subscription the 'push' state in the associated FlexClient
-     * may need to be updated to ensure that the correct endpoint is used for this subscription.
-     *
-     * @param newEndpointId The id for the new endpoint that the subscription may have failed over to.
-     */
-    public void resetEndpoint(String newEndpointId)
-    {
-        String oldEndpointId = null;
-        FlexSession oldSession = null;
-        FlexSession newSession = FlexContext.getFlexSession();
-        synchronized (lock)
-        {
-            // If anything is null, or nothing has changed, no need for a reset.
-            if (endpointId == null || newEndpointId == null || flexSession == null || newSession == null || (endpointId.equals(newEndpointId) && flexSession.equals(newSession)))
-                return;
-
-            oldEndpointId = endpointId;
-            endpointId = newEndpointId;
-
-            oldSession = flexSession;
-            flexSession = newSession;
-        }
-
-        // Unregister in order to reset the proper push settings in the re-registration below once the session association has been patched.
-        if (flexClient != null)
-            flexClient.unregisterMessageClient(this);
-
-        // Clear out any reference to this subscription that the previously associated session has.
-        if (oldSession != null)
-            oldSession.unregisterMessageClient(this);
-
-        // Associate the current session with this subscription.
-        if (flexSession != null)
-            flexSession.registerMessageClient(this);
-
-        // Reset proper push settings.
-        if (flexClient != null)
-            flexClient.registerMessageClient(this);
-
-        if (Log.isDebug())
-        {
-            String msg = "MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been reset as a result of a resubscribe.";
-            if (oldEndpointId != null && !oldEndpointId.equals(newEndpointId))
-                msg += " Endpoint change [" + oldEndpointId + " -> " + newEndpointId + "]";
-            if ((oldSession != null) && (newSession != null) && (oldSession != newSession)) // Test identity.
-                msg += " FlexSession change [" + oldSession.getClass().getName() + ":" + oldSession.getId() + " -> " + newSession.getClass().getName() + ":" + newSession.getId() + "]";
-
-            Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug(msg);
-        }
-    }
-
-    /**
-     *
-     * Used to test whether this client should receive this message
-     * based on the list of subscriptions we have recorded for it.
-     * It must match both the subtopic and the selector expression.
-     * Usually this is done by the subscription manager - this logic is
-     * only here to maintain api compatibility with one of the variants
-     * of the pushMessageToClients which has subscriberIds and an evalSelector
-     * property.
-     *
-     * @param message The message to test.
-     */
-    public boolean testMessage(Message message, MessageDestination destination)
-    {
-        String subtopic = (String) message.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME);
-        String subtopicSeparator = destination.getServerSettings().getSubtopicSeparator();
-        synchronized (lock)
-        {
-            for (SubscriptionInfo si : subscriptions)
-            {
-                if (si.matches(message, subtopic, subtopicSeparator))
-                    return true;
-            }
-        }
-        return false;
-    }
-
-    /**
-     * Returns true if the MessageClient is valid; false if it has been invalidated.
-     *
-     * @return true if the MessageClient is valid; otherwise false.
-     */
-    public boolean isValid()
-    {
-        synchronized (lock)
-        {
-            return valid;
-        }
-    }
-    
-    /**
-     * Invalidates the MessageClient.
-     */
-    public void invalidate()
-    {
-        invalidate(false /* don't attempt to notify the client */);
-    }
-
-    /**
-     * Invalidates the MessageClient, and optionally attempts to notify the client that
-     * this subscription has been invalidated.
-     * This overload is used when a subscription is timed out while the client is still
-     * actively connected to the server but should also be used by any custom code on the server
-     * that invalidates MessageClients but wishes to notify the client cleanly.
-     *
-     * @param notifyClient <code>true</code> to notify the client that its subscription has been
-     *                     invalidated.
-     */
-    public void invalidate(boolean notifyClient)
-    {
-        synchronized (lock)
-        {
-            if (!valid || invalidating)
-                return; // Already shutting down.
-
-            invalidating = true; // This thread gets to shut the MessageClient down.
-            cancelTimeout();
-        }
-
-        // Record whether we're attempting to notify the client or not.
-        attemptingInvalidationClientNotification = notifyClient;
-
-        // Build a subscription invalidation message and push to the client if it is still valid.
-        if (notifyClient && flexClient != null && flexClient.isValid())
-        {
-            CommandMessage msg = new CommandMessage();
-            msg.setDestination(destination.getId());
-            msg.setClientId(clientId);
-            msg.setOperation(CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION);
-            Set subscriberIds = new TreeSet();
-            subscriberIds.add(clientId);
-            try
-            {
-                if (destination instanceof MessageDestination)
-                {
-                    MessageDestination msgDestination = (MessageDestination)destination;
-                    ((MessageService)msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, msg, false /* don't eval selector */);
-                }
-            }
-            catch (MessageException ignore) {}
-        }
-
-        // Notify messageClientDestroyed listeners that we're being invalidated.
-        if (destroyedListeners != null && !destroyedListeners.isEmpty())
-        {
-            for (Iterator iter = destroyedListeners.iterator(); iter.hasNext();)
-            {
-                ((MessageClientListener)iter.next()).messageClientDestroyed(this);
-            }
-            destroyedListeners.clear();
-        }
-
-        // And generate unsubscribe messages for all of the MessageClient's subscriptions and
-        // route them to the destination this MessageClient is subscribed to.
-        // The reason we send a message to the service rather than just going straight to the SubscriptionManager
-        // is that some adapters manage their own subscription state (i.e. JMS) in addition to us keeping track of
-        // things with our SubscriptionManager.
-        ArrayList<CommandMessage> unsubMessages = new ArrayList<CommandMessage>();
-        synchronized (lock)
-        {
-            for (SubscriptionInfo subInfo : subscriptions)
-            {
-                CommandMessage unsubMessage = new CommandMessage();
-                unsubMessage.setDestination(destination.getId());
-                unsubMessage.setClientId(clientId);
-                unsubMessage.setOperation(CommandMessage.UNSUBSCRIBE_OPERATION);
-                unsubMessage.setHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER, Boolean.TRUE);
-                unsubMessage.setHeader(CommandMessage.SELECTOR_HEADER, subInfo.selector);
-                unsubMessage.setHeader(AsyncMessage.SUBTOPIC_HEADER_NAME, subInfo.subtopic);
-                unsubMessages.add(unsubMessage);
-            }
-        }
-        // Release the lock and send the unsub messages.
-        for (CommandMessage unsubMessage : unsubMessages)
-        {
-            try
-            {
-                destination.getService().serviceCommand(unsubMessage);
-            }
-            catch (MessageException me)
-            {
-                if (Log.isDebug())
-                    Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient: " + getClientId() + " issued an unsubscribe message during invalidation that was not processed but will continue with invalidation. Reason: " + ExceptionUtil.toString(me));
-            }
-        }
-
-        synchronized (lock)
-        {
-            // If we didn't clean up all subscriptions log an error and continue with shutdown.
-            int remainingSubscriptionCount = subscriptions.size();
-            if (remainingSubscriptionCount > 0 && Log.isError())
-                Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).error("MessageClient: " + getClientId() + " failed to remove " + remainingSubscriptionCount + " subscription(s) during invalidation");
-        }
-
-        // If someone registered this message client, invalidating it will free
-        // their reference which will typically also remove this message client.
-        if (registered && destination instanceof MessageDestination)
-            ((MessageDestination)destination).getSubscriptionManager().releaseMessageClient(this);
-
-        synchronized (lock)
-        {
-            valid = false;
-            invalidating = false;
-        }
-
-        if (Log.isDebug())
-            Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been invalidated.");
-    }
-
-    /**
-     * Pushes the supplied message and then invalidates the MessageClient.
-     *
-     * @param message The message to push to the client before invalidating.
-     * When message is null, MessageClient is invalidated silently.
-     */
-    public void invalidate(Message message)
-    {
-        if (message != null)
-        {
-            message.setDestination(destination.getId());
-            message.setClientId(clientId);
-
-            Set subscriberIds = new TreeSet();
-            subscriberIds.add(clientId);
-            try
-            {
-                if (destination instanceof MessageDestination)
-                {
-                    MessageDestination msgDestination = (MessageDestination)destination;
-                    ((MessageService)msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, message, false /* don't eval selector */);
-                }
-            }
-            catch (MessageException ignore) {}
-
-            invalidate(true /* attempt to notify remote client */);
-        }
-        else
-        {
-            invalidate();
-        }
-    }
-
-    /**
-     *
-     * Compares this MessageClient to the specified object. The result is true if
-     * the argument is not null and is a MessageClient instance with a matching
-     * clientId value.
-     *
-     * @param o The object to compare this MessageClient to.
-     * @return true if the MessageClient is equal; otherwise false.
-     */
-    public boolean equals(Object o)
-    {
-        if (o instanceof MessageClient)
-        {
-            MessageClient c = (MessageClient) o;
-            if (c != null && c.getClientId().equals(clientId))
-                return true;
-        }
-        return false;
-    }
-
-    /**
-     *
-     * Returns the hash code for this MessageClient. The returned value is
-     * the hash code for the MessageClient's clientId property.
-     *
-     * @return The hash code value for this MessageClient.
-     */
-    @Override
-    public int hashCode()
-    {
-        return getClientId().hashCode();
-    }
-
-    /**
-     *
-     * The String representation of this MessageClient is returned (its clientId value).
-     *
-     * @return The clientId value for this MessageClient.
-     */
-    @Override
-    public String toString()
-    {
-        return String.valueOf(clientId);
-    }
-
-    //----------------------------------
-    //  TimeoutAbstractObject overrides
-    //----------------------------------
-
-    /**
-     *
-     * Implements TimeoutCapable.
-     * This method returns the timeout value configured for the MessageClient's destination.
-     */
-    @Override
-    public long getTimeoutPeriod()
-    {
-        return (destination instanceof MessageDestination) ? 
-                ((MessageDestination)destination).getSubscriptionManager().getSubscriptionTimeoutMillis() : 0;
-    }
-
-    /**
-     *
-     * Implements TimeoutCapable.
-     * This method is invoked when the MessageClient has timed out and it
-     * invalidates the MessageClient.
-     */
-    public void timeout()
-    {
-        invalidate(true /* notify client */);
-    }
-
-    /**
-     *
-     * Returns true if a timeout task is running for this MessageClient.
-     */
-    public boolean isTimingOut()
-    {
-        return willTimeout;
-    }
-
-    /**
-     *
-     * Records whether a timeout task is running for this MessageClient.
-     */
-    public void setTimingOut(boolean value)
-    {
-        willTimeout = value;
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Private Methods
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Utility method that tests validity and throws an exception if the instance
-     * has been invalidated.
-     */
-    private void checkValid()
-    {
-        synchronized (lock)
-        {
-            if (!valid)
-            {
-                throw new RuntimeException("MessageClient has been invalidated."); // TODO - localize
-            }
-        }
-    }
-
-    private OutboundQueueThrottleManager getThrottleManager(boolean create)
-    {
-        if (flexClient != null)
-        {
-            FlexClientOutboundQueueProcessor processor = flexClient.getOutboundQueueProcessor(endpointId);
-            if (processor != null)
-                return create? processor.getOrCreateOutboundQueueThrottleManager() : processor.getOutboundQueueThrottleManager();
-        }
-        return null;
-    }
-
-    private void unregisterSubscriptionWithThrottleManager(SubscriptionInfo si)
-    {
-        OutboundQueueThrottleManager throttleManager = getThrottleManager(false);
-        if (throttleManager != null)
-            throttleManager.unregisterSubscription(destinationId, si);
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Nested Classes
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Represents a MessageClient's subscription to a destination.
-     * It captures the optional selector expression and subtopic for the
-     * subscription.
-     */
-    public static class SubscriptionInfo implements Comparable
-    {
-        public String selector, subtopic;
-        public int maxFrequency; // maxFrequency per subscription. Not used in BlazeDS.
-
-        public SubscriptionInfo(String sel, String sub)
-        {
-            this(sel, sub, 0);
-        }
-
-        public SubscriptionInfo(String sel, String sub, int maxFrequency)
-        {
-            this.selector = sel;
-            this.subtopic = sub;
-            this.maxFrequency = maxFrequency;
-        }
-
-        @Override
-        public boolean equals(Object o)
-        {
-            if (o instanceof SubscriptionInfo)
-            {
-                SubscriptionInfo other = (SubscriptionInfo) o;
-                return equalStrings(other.selector, selector) &&
-                       equalStrings(other.subtopic, subtopic);
-            }
-            return false;
-        }
-
-        @Override
-        public int hashCode()
-        {
-            return (selector == null ? 0 : selector.hashCode()) +
-                   (subtopic == null ? 1 : subtopic.hashCode());
-        }
-
-        /**
-         * Compares the two subscription infos (being careful to
-         * ensure we compare in a consistent way if the arguments
-         * are switched).
-         * @param o the object to compare
-         * @return int the compare result
-         */
-        public int compareTo(Object o)
-        {
-            SubscriptionInfo other = (SubscriptionInfo) o;
-            int result;
-
-            if ((result = compareStrings(other.selector, selector)) != 0)
-                return result;
-            else if ((result = compareStrings(other.subtopic, subtopic)) != 0)
-                return result;
-
-            return 0;
-        }
-
-        /**
-         * Check whether the message matches with selected subtopic.
-         * @param message current message
-         * @param subtopicToMatch subtopc string
-         * @param subtopicSeparator suptopic separator 
-         * @return true if the message matches the subtopic
-         */
-        public boolean matches(Message message, String subtopicToMatch, String subtopicSeparator)
-        {
-            if ((subtopicToMatch == null && subtopic != null) || (subtopicToMatch != null && subtopic == null))
-                return false; // If either defines a subtopic, they both must define one.
-
-            // If both define a subtopic, they must match.
-            if (subtopicToMatch != null && subtopic != null)
-            {
-                Subtopic consumerSubtopic = new Subtopic(subtopic, subtopicSeparator);
-                Subtopic messageSubtopic = new Subtopic(subtopicToMatch, subtopicSeparator);
-                if (!consumerSubtopic.matches(messageSubtopic))
-                    return false; // Not a match.
-            }
-
-            if (selector == null)
-                return true;
-
-            JMSSelector jmsSelector = new JMSSelector(selector);
-            try
-            {
-                if (jmsSelector.match(message))
-                    return true;
-            }
-            catch (JMSSelectorException jmse)
-            {
-                // Log a warning for this client's selector and continue
-                if (Log.isWarn())
-                {
-                    Log.getLogger(JMSSelector.LOG_CATEGORY).warn("Error processing message selector: " +
-                         jmse.toString() + StringUtils.NEWLINE +
-                         "  incomingMessage: " + message + StringUtils.NEWLINE +
-                         "  selector: " + selector + StringUtils.NEWLINE);
-                }
-            }
-            return false;
-        }
-
-        /**
-         * Returns a String representation of the subscription info.
-         * @return String the string representation of the subscription info
-         */
-        public String toString()
-        {
-            StringBuffer sb = new StringBuffer();
-            sb.append("Subtopic: " + subtopic + StringUtils.NEWLINE);
-            sb.append("Selector: " + selector + StringUtils.NEWLINE);
-            if (maxFrequency > 0)
-                sb.append("maxFrequency: " + maxFrequency);
-            return sb.toString();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/MessageClientListener.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/MessageClientListener.java b/core/src/flex/messaging/MessageClientListener.java
deleted file mode 100644
index 7cfdb90..0000000
--- a/core/src/flex/messaging/MessageClientListener.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging;
-
-/**
- * Interface to be notified when a MessageClient is created or destroyed. Implementations of this interface
- * may add themselves as listeners statically via <code>MessageClient.addMessageClientCreatedListener()</code>.
- * To listen for MessageClient destruction, the implementation class instance must add itself as a listener to
- * a specific MessageClient instance via the <code>addMessageClientDestroyedListener()</code> method.
- */
-public interface MessageClientListener 
-{
-    /**
-     * Notification that a MessageClient was created.
-     * 
-     * @param messageClient The MessageClient that was created.
-     */
-    void messageClientCreated(MessageClient messageClient);
-    
-    /**
-     * Notification that a MessageClient is about to be destroyed.
-     * 
-     * @param messageClient The MessageClient that will be destroyed.
-     */
-    void messageClientDestroyed(MessageClient messageClient);
-}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/8315f8fa/core/src/flex/messaging/MessageDestination.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/MessageDestination.java b/core/src/flex/messaging/MessageDestination.java
deleted file mode 100644
index 141ac8a..0000000
--- a/core/src/flex/messaging/MessageDestination.java
+++ /dev/null
@@ -1,491 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging;
-
-import flex.management.runtime.messaging.MessageDestinationControl;
-import flex.management.runtime.messaging.services.messaging.SubscriptionManagerControl;
-import flex.management.runtime.messaging.services.messaging.ThrottleManagerControl;
-import flex.messaging.config.ConfigurationConstants;
-import flex.messaging.config.ConfigurationException;
-import flex.messaging.config.DestinationSettings;
-import flex.messaging.config.ThrottleSettings;
-import flex.messaging.config.ConfigMap;
-import flex.messaging.config.NetworkSettings;
-import flex.messaging.config.ServerSettings;
-import flex.messaging.config.ThrottleSettings.Policy;
-import flex.messaging.log.LogCategories;
-import flex.messaging.services.MessageService;
-import flex.messaging.services.Service;
-import flex.messaging.services.messaging.SubscriptionManager;
-import flex.messaging.services.messaging.RemoteSubscriptionManager;
-import flex.messaging.services.messaging.ThrottleManager;
-import flex.messaging.services.messaging.MessagingConstants;
-import flex.messaging.util.ClassUtil;
-
-/**
- * A logical reference to a MessageDestination.
- */
-public class MessageDestination extends FactoryDestination
-{
-    static final long serialVersionUID = -2016911808141319012L;
-
-    /** Log category for <code>MessageDestination</code>.*/
-    public static final String LOG_CATEGORY = LogCategories.SERVICE_MESSAGE;
-
-    // Errors
-    private static final int UNSUPPORTED_POLICY = 10124;
-
-    // Destination properties
-    private transient ServerSettings serverSettings;
-
-    // Destination internal
-    private transient SubscriptionManager subscriptionManager;
-    private transient RemoteSubscriptionManager remoteSubscriptionManager;
-    private transient ThrottleManager throttleManager;
-
-    private transient MessageDestinationControl controller;
-
-    //--------------------------------------------------------------------------
-    //
-    // Constructor
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Constructs an unmanaged <code>MessageDestination</code> instance.
-     */
-    public MessageDestination()
-    {
-        this(false);
-    }
-
-    /**
-     * Constructs a <code>MessageDestination</code> with the indicated management.
-     *
-     * @param enableManagement <code>true</code> if the <code>MessageDestination</code>
-     * is manageable; otherwise <code>false</code>.
-     */
-    public MessageDestination(boolean enableManagement)
-    {
-        super(enableManagement);
-
-        serverSettings = new ServerSettings();
-
-        // Managers
-        subscriptionManager = new SubscriptionManager(this);
-        remoteSubscriptionManager = new RemoteSubscriptionManager(this);
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Initialize, validate, start, and stop methods.
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Initializes the <code>MessageDestination</code> with the properties.
-     * If subclasses override, they must call <code>super.initialize()</code>.
-     *
-     * @param id The id of the destination.
-     * @param properties Properties for the <code>MessageDestination</code>.
-     */
-    @Override
-    public void initialize(String id, ConfigMap properties)
-    {
-        super.initialize(id, properties);
-
-        if (properties == null || properties.size() == 0)
-            return;
-
-        // Network properties
-        network(properties);
-
-        // Server properties
-        server(properties);
-    }
-
-    /**
-     * Sets up the throttle manager before it starts.
-     */
-    @Override
-    public void start()
-    {
-        // Create the throttle manager, only if needed.
-        if (networkSettings.getThrottleSettings() != null)
-        {
-            ThrottleSettings settings = networkSettings.getThrottleSettings();
-            if (settings.isClientThrottleEnabled() || settings.isDestinationThrottleEnabled())
-            {
-                settings.setDestinationName(getId());
-                throttleManager = createThrottleManager();
-                throttleManager.setThrottleSettings(settings);
-                throttleManager.start();
-            }
-        }
-        super.start();
-    }
-
-    /**
-     * Stops the subscription, remote subscription, and throttle managers and
-     * then calls super class's stop.
-     */
-    @Override
-    public void stop()
-    {
-        if (isStarted())
-        {
-            subscriptionManager.stop();
-            remoteSubscriptionManager.stop();
-            if (throttleManager != null)
-                throttleManager.stop();
-        }
-        super.stop();
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Public Getters and Setters for Destination properties
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Sets the <code>NetworkSettings</code> of the <code>MessageDestination</code>.
-     *
-     * @param networkSettings The <code>NetworkSettings</code> of the <code>MessageDestination</code>
-     */
-    @Override
-    public void setNetworkSettings(NetworkSettings networkSettings)
-    {
-        super.setNetworkSettings(networkSettings);
-
-        // Set the subscription manager settings if needed.
-        if (networkSettings.getSubscriptionTimeoutMinutes() > 0)
-        {
-            long subscriptionTimeoutMillis = networkSettings.getSubscriptionTimeoutMinutes() * 60L * 1000L; // Convert to millis.
-            subscriptionManager.setSubscriptionTimeoutMillis(subscriptionTimeoutMillis);
-        }
-    }
-
-    /**
-     * Returns the <code>ServerSettings</code> of the <code>MessageDestination</code>.
-     *
-     * @return The <code>ServerSettings</code> of the <code>MessageDestination</code>.
-     */
-    public ServerSettings getServerSettings()
-    {
-        return serverSettings;
-    }
-
-    /**
-     * Sets the <code>ServerSettings</code> of the <code>MessageDestination</code>.
-     *
-     * @param serverSettings The <code>ServerSettings</code> of the <code>MessageDestination</code>
-     */
-    public void setServerSettings(ServerSettings serverSettings)
-    {
-        this.serverSettings = serverSettings;
-    }
-
-    /**
-     * Casts the <code>Service</code> into <code>MessageService</code>
-     * and calls super.setService.
-     *
-     * @param service The <code>Service</code> managing this <code>Destination</code>.
-     */
-    @Override
-    public void setService(Service service)
-    {
-        MessageService messageService = (MessageService)service;
-        super.setService(messageService);
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Other Public Methods
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     *
-     * Returns a <tt>ConfigMap</tt> of destination properties that the client
-     * needs. This includes properties from <code>super{@link #describeDestination(boolean)}</code>
-     * and it also includes outbound throttling policy that the edge server might need.
-     *
-     * @param onlyReliable Determines whether only reliable destination configuration should be returned.
-     * @return A <tt>ConfigMap</tt> of destination properties that the client needs.
-     */
-    @Override
-    public ConfigMap describeDestination(boolean onlyReliable)
-    {
-        ConfigMap destinationConfig = super.describeDestination(onlyReliable);
-        if (destinationConfig == null)
-            return null;
-
-        if (throttleManager == null)
-            return destinationConfig;
-
-        Policy outboundPolicy = throttleManager.getOutboundPolicy();
-        if (outboundPolicy == null || outboundPolicy == Policy.NONE)
-            return destinationConfig;
-
-        // Add the outbound throttle policy to network properties section as appropriate.
-        ConfigMap properties = destinationConfig.getPropertyAsMap(ConfigurationConstants.PROPERTIES_ELEMENT, null);
-        if (properties == null)
-        {
-            properties = new ConfigMap();
-            destinationConfig.addProperty(ConfigurationConstants.PROPERTIES_ELEMENT, properties);
-        }
-
-        ConfigMap network = properties.getPropertyAsMap(NetworkSettings.NETWORK_ELEMENT, null);
-        if (network == null)
-        {
-            network = new ConfigMap();
-            properties.addProperty(NetworkSettings.NETWORK_ELEMENT, network);
-        }
-
-        ConfigMap throttleOutbound = new ConfigMap();
-        throttleOutbound.addProperty(ThrottleSettings.ELEMENT_POLICY, throttleManager.getOutboundPolicy().toString());
-        network.addProperty(ThrottleSettings.ELEMENT_OUTBOUND, throttleOutbound);
-
-        return destinationConfig;
-    }
-
-
-    public SubscriptionManager getSubscriptionManager()
-    {
-        return subscriptionManager;
-    }
-
-
-    public RemoteSubscriptionManager getRemoteSubscriptionManager()
-    {
-        return remoteSubscriptionManager;
-    }
-
-
-    public ThrottleManager getThrottleManager()
-    {
-        return throttleManager;
-    }
-
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (o instanceof Destination)
-        {
-            Destination d = (Destination)o;
-            String serviceType1 = d.getServiceType();
-            String serviceType2 = getServiceType();
-            if ((serviceType1 == null && serviceType2 == null) || (serviceType1 != null && serviceType1.equals(serviceType2)))
-            {
-                String id1 = d.getId();
-                String id2 = getId();
-                if ((id1 == null && id2 == null) || (id1 != null && id1.equals(id2)))
-                    return true;
-            }
-        }
-        return false;
-    }
-
-
-    @Override
-    public int hashCode()
-    {
-        return (getServiceType() == null ? 0 : getServiceType().hashCode()) * 100003 +
-            (getId() == null ? 0 : getId().hashCode());
-    }
-
-
-    @Override
-    public String toString()
-    {
-        return getServiceType() + "#" + getId();
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Protected/Private Methods
-    //
-    //--------------------------------------------------------------------------
-
-    protected ThrottleManager createThrottleManager()
-    {
-        Service service = getService();
-        if (service == null || service.getMessageBroker() == null)
-            return new ThrottleManager(); // Return the default.
-
-        try
-        {
-            Class<? extends ThrottleManager> throttleManagerClass = service.getMessageBroker().getThrottleManagerClass();
-            Object instance = ClassUtil.createDefaultInstance(throttleManagerClass, null);
-            if (instance instanceof ThrottleManager)
-                return (ThrottleManager)instance;
-        }
-        catch (Throwable t)
-        {
-            // NOWARN
-        }
-
-        return new ThrottleManager(); // Return the default.
-    }
-
-    protected void network(ConfigMap properties)
-    {
-        ConfigMap network = properties.getPropertyAsMap(NetworkSettings.NETWORK_ELEMENT, null);
-        if (network == null)
-            return;
-
-        // Get implementation specific network settings, including subclasses!
-        NetworkSettings ns = getNetworkSettings();
-
-        // Subscriber timeout; first check for subscription-timeout-minutes and fallback to legacy session-timeout.
-        int useLegacyPropertyToken = -999999;
-        int subscriptionTimeoutMinutes = network.getPropertyAsInt(NetworkSettings.SUBSCRIPTION_TIMEOUT_MINUTES, useLegacyPropertyToken);
-        if (subscriptionTimeoutMinutes == useLegacyPropertyToken)
-            subscriptionTimeoutMinutes = network.getPropertyAsInt(NetworkSettings.SESSION_TIMEOUT, NetworkSettings.DEFAULT_TIMEOUT);
-        ns.setSubscriptionTimeoutMinutes(subscriptionTimeoutMinutes);
-
-        // Throttle Settings
-        ThrottleSettings ts = ns.getThrottleSettings();
-        ts.setDestinationName(getId());
-        throttle(ts, network);
-
-        setNetworkSettings(ns);
-    }
-
-    protected void throttle(ThrottleSettings ts, ConfigMap network)
-    {
-        ConfigMap inbound = network.getPropertyAsMap(ThrottleSettings.ELEMENT_INBOUND, null);
-        if (inbound != null)
-        {
-            ThrottleSettings.Policy policy = getPolicyFromThrottleSettings(inbound);
-            ts.setInboundPolicy(policy);
-            int destFreq = inbound.getPropertyAsInt(ThrottleSettings.ELEMENT_DEST_FREQ, 0);
-            ts.setIncomingDestinationFrequency(destFreq);
-            int clientFreq = inbound.getPropertyAsInt(ThrottleSettings.ELEMENT_CLIENT_FREQ, 0);
-            ts.setIncomingClientFrequency(clientFreq);
-        }
-
-        ConfigMap outbound = network.getPropertyAsMap(ThrottleSettings.ELEMENT_OUTBOUND, null);
-        if (outbound != null)
-        {
-            ThrottleSettings.Policy policy = getPolicyFromThrottleSettings(outbound);
-            ts.setOutboundPolicy(policy);
-            int destFreq = outbound.getPropertyAsInt(ThrottleSettings.ELEMENT_DEST_FREQ, 0);
-            ts.setOutgoingDestinationFrequency(destFreq);
-            int clientFreq = outbound.getPropertyAsInt(ThrottleSettings.ELEMENT_CLIENT_FREQ, 0);
-            ts.setOutgoingClientFrequency(clientFreq);
-        }
-    }
-
-    private ThrottleSettings.Policy getPolicyFromThrottleSettings(ConfigMap settings)
-    {
-        String policyString = settings.getPropertyAsString(ThrottleSettings.ELEMENT_POLICY, null);
-        ThrottleSettings.Policy policy = ThrottleSettings.Policy.NONE;
-        if (policyString == null)
-            return policy;
-        try
-        {
-            policy = ThrottleSettings.parsePolicy(policyString);
-        }
-        catch (ConfigurationException exception)
-        {
-            ConfigurationException ce = new ConfigurationException();
-            ce.setMessage(UNSUPPORTED_POLICY, new Object[] {getId(), policyString});
-            throw ce;
-        }
-        return policy;
-    }
-
-    protected void server(ConfigMap properties)
-    {
-        ConfigMap server = properties.getPropertyAsMap(DestinationSettings.SERVER_ELEMENT, null);
-        if (server == null)
-            return;
-
-        long ttl = server.getPropertyAsLong(MessagingConstants.TIME_TO_LIVE_ELEMENT, -1);
-        serverSettings.setMessageTTL(ttl);
-
-        boolean durable = server.getPropertyAsBoolean(MessagingConstants.IS_DURABLE_ELEMENT, false);
-        serverSettings.setDurable(durable);
-
-        boolean allowSubtopics = server.getPropertyAsBoolean(MessagingConstants.ALLOW_SUBTOPICS_ELEMENT, false);
-        serverSettings.setAllowSubtopics(allowSubtopics);
-
-        boolean disallowWildcardSubtopics = server.getPropertyAsBoolean(MessagingConstants.DISALLOW_WILDCARD_SUBTOPICS_ELEMENT, false);
-        serverSettings.setDisallowWildcardSubtopics(disallowWildcardSubtopics);
-
-        int priority = server.getPropertyAsInt(MessagingConstants.MESSAGE_PRIORITY, -1);
-        if (priority != -1)
-            serverSettings.setPriority(priority);
-
-        String subtopicSeparator = server.getPropertyAsString(MessagingConstants.SUBTOPIC_SEPARATOR_ELEMENT, MessagingConstants.DEFAULT_SUBTOPIC_SEPARATOR);
-        serverSettings.setSubtopicSeparator(subtopicSeparator);
-
-        String routingMode = server.getPropertyAsString(MessagingConstants.CLUSTER_MESSAGE_ROUTING, "server-to-server");
-        serverSettings.setBroadcastRoutingMode(routingMode);
-    }
-
-    /**
-     * Returns the log category of the <code>MessageDestination</code>.
-     *
-     * @return The log category of the component.
-     */
-    @Override
-    protected String getLogCategory()
-    {
-        return LOG_CATEGORY;
-    }
-
-    /**
-     * Invoked automatically to allow the <code>MessageDestination</code> to setup its corresponding
-     * MBean control.
-     *
-     * @param service The <code>Service</code> that manages this <code>MessageDestination</code>.
-     */
-    @Override
-    protected void setupDestinationControl(Service service)
-    {
-        controller = new MessageDestinationControl(this, service.getControl());
-        controller.register();
-        setControl(controller);
-        setupThrottleManagerControl(controller);
-        setupSubscriptionManagerControl(controller);
-    }
-
-    protected void setupThrottleManagerControl(MessageDestinationControl destinationControl)
-    {
-        if (throttleManager != null)
-        {
-            ThrottleManagerControl throttleManagerControl = new ThrottleManagerControl(throttleManager, destinationControl);
-            throttleManagerControl.register();
-            throttleManager.setControl(throttleManagerControl);
-            throttleManager.setManaged(true);
-            destinationControl.setThrottleManager(throttleManagerControl.getObjectName());
-        }
-    }
-
-    private void setupSubscriptionManagerControl(MessageDestinationControl destinationControl)
-    {
-        SubscriptionManagerControl subscriptionManagerControl = new SubscriptionManagerControl(getSubscriptionManager(), destinationControl);
-        subscriptionManagerControl.register();
-        getSubscriptionManager().setControl(subscriptionManagerControl);
-        getSubscriptionManager().setManaged(true);
-        destinationControl.setSubscriptionManager(subscriptionManagerControl.getObjectName());
-    }
-}


Mime
View raw message