flex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cd...@apache.org
Subject [09/51] [partial] FLEX-34306 - [BlazeDS] Make the BlazeDS build run on Windows machines - Added some mkdir commands to the ANT Build.java - Did some fine-tuning to resolve some compile errors
Date Mon, 05 May 2014 20:08:23 GMT
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/4f6a3052/modules/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java b/modules/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java
old mode 100755
new mode 100644
index a469d6c..70f1318
--- a/modules/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java
+++ b/modules/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java
@@ -1,603 +1,603 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.endpoints;
-
-import flex.messaging.FlexContext;
-import flex.messaging.FlexSession;
-import flex.messaging.client.FlexClient;
-import flex.messaging.client.FlushResult;
-import flex.messaging.client.PollFlushResult;
-import flex.messaging.client.PollWaitListener;
-import flex.messaging.client.UserAgentSettings;
-import flex.messaging.config.ConfigMap;
-import flex.messaging.config.ConfigurationConstants;
-import flex.messaging.log.Log;
-import flex.messaging.messages.CommandMessage;
-import flex.messaging.util.UserAgentManager;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Base class for HTTP-based endpoints that support regular polling and long polling,
- * which means placing request threads that are polling for messages into a wait
- * state until messages are available for delivery or the configurable wait interval
- * is reached.
- */
-public abstract class BasePollingHTTPEndpoint extends BaseHTTPEndpoint implements PollWaitListener
-{
-    //--------------------------------------------------------------------------
-    //
-    // Private Static Constants
-    //
-    //--------------------------------------------------------------------------
-
-    private static final String POLLING_ENABLED = "polling-enabled";
-    private static final String POLLING_INTERVAL_MILLIS = "polling-interval-millis";
-    private static final String POLLING_INTERVAL_SECONDS = "polling-interval-seconds"; // Deprecated configuration option.
-    private static final String MAX_WAITING_POLL_REQUESTS = "max-waiting-poll-requests";
-    private static final String WAIT_INTERVAL_MILLIS = "wait-interval-millis";
-    private static final String CLIENT_WAIT_INTERVAL_MILLIS = "client-wait-interval-millis";
-    // Force clients that exceed the long-poll limit to wait at least this long between poll requests.
-    // This matches the default polling interval defined in the client PollingChannel.
-    private static final int DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS = 3000;
-
-    // User Agent based settings manager
-    private UserAgentManager userAgentManager = new UserAgentManager();
-
-    //--------------------------------------------------------------------------
-    //
-    // Constructor
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Constructs an unmanaged <code>BasePollingHTTPEndpoint</code>.
-     */
-    public BasePollingHTTPEndpoint()
-    {
-        this(false);
-    }
-
-    /**
-     * Constructs an <code>BasePollingHTTPEndpoint</code> with the indicated management.
-     *
-     * @param enableManagement <code>true</code> if the <code>BasePollingHTTPEndpoint</code>
-     * is manageable; <code>false</code> otherwise.
-     */
-    public BasePollingHTTPEndpoint(boolean enableManagement)
-    {
-        super(enableManagement);
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Initialize, validate, start, and stop methods.
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Initializes the <code>Endpoint</code> with the properties.
-     * If subclasses override this method, they must call <code>super.initialize()</code>.
-     *
-     * @param id The ID of the <code>Endpoint</code>.
-     * @param properties Properties for the <code>Endpoint</code>.
-     */
-    @Override
-    public void initialize(String id, ConfigMap properties)
-    {
-        super.initialize(id, properties);
-
-        if (properties == null || properties.size() == 0)
-        {
-            // Initialize default user agent manager settings.
-            UserAgentManager.setupUserAgentManager(null, userAgentManager);
-
-            return; // Nothing else to initialize.
-        }
-
-        // General poll props.
-        pollingEnabled = properties.getPropertyAsBoolean(POLLING_ENABLED, false);
-        pollingIntervalMillis = properties.getPropertyAsLong(POLLING_INTERVAL_MILLIS, -1);
-        long pollingIntervalSeconds = properties.getPropertyAsLong(POLLING_INTERVAL_SECONDS, -1); // Deprecated
-        if (pollingIntervalSeconds > -1)
-            pollingIntervalMillis = pollingIntervalSeconds * 1000;
-
-        // Piggybacking props.
-        piggybackingEnabled = properties.getPropertyAsBoolean(ConfigurationConstants.PIGGYBACKING_ENABLED_ELEMENT, false);
-
-        // HTTP poll wait props.
-        maxWaitingPollRequests = properties.getPropertyAsInt(MAX_WAITING_POLL_REQUESTS, 0);
-        waitInterval = properties.getPropertyAsLong(WAIT_INTERVAL_MILLIS, 0);
-        clientWaitInterval = properties.getPropertyAsInt(CLIENT_WAIT_INTERVAL_MILLIS, 0);
-
-        // User Agent props.
-        UserAgentManager.setupUserAgentManager(properties, userAgentManager);
-
-        // Set initial state for the canWait flag based on whether we allow waits or not.
-        if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval > 0))
-        {
-            waitEnabled = true;
-            canWait = true;
-        }
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Variables
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * This flag is volatile to allow for consistent reads across thread without
-     * needing to pay the cost for a synchronized lock for each read.
-     */
-    private volatile boolean canWait;
-
-    /**
-     * Used to synchronize sets and gets to the number of waiting clients.
-     */
-    protected final Object lock = new Object();
-
-    /**
-     * Set when properties are handled; used as a shortcut for logging to determine whether this
-     * instance attempts to put request threads in a wait state or not.
-     */
-    private boolean waitEnabled;
-
-    /**
-     * A count of the number of request threads that are currently in the wait state (including
-     * those on their way into or out of it).
-     */
-    protected int waitingPollRequestsCount;
-
-    /**
-     * A Map(notification Object for a waited request thread, Boolean.TRUE).
-     */
-    private ConcurrentHashMap currentWaitedRequests;
-
-    //--------------------------------------------------------------------------
-    //
-    // Properties
-    //
-    //--------------------------------------------------------------------------
-
-    //----------------------------------
-    //  clientWaitInterval
-    //----------------------------------
-
-    protected int clientWaitInterval = 0;
-
-    /**
-     * Retrieves the number of milliseconds the client will wait after receiving a response for
-     * a poll with server wait before it issues its next poll request.
-     * A value of zero or less causes the client to use its default polling interval (based on the
-     * channel's polling-interval-millis configuration) and this value is ignored.
-     * A value greater than zero will cause the client to wait for the specified interval before
-     * issuing its next poll request with a value of 1 triggering an immediate poll from the client
-     * as soon as a waited poll response is received.
-     * @return The client wait interval.
-     */
-    public int getClientWaitInterval()
-    {
-        return clientWaitInterval;
-    }
-
-    /**
-     * Sets the number of milliseconds a client will wait after receiving a response for a poll
-     * with server wait before it issues its next poll request.
-     * A value of zero or less causes the client to use its default polling interval (based on the
-     * channel's polling-interval-millis configuration) and this value is ignored.
-     * A value greater than zero will cause the client to wait for the specified interval before
-     * issuing its next poll request with a value of 1 triggering an immediate poll from the client
-     * as soon as a waited poll response is received.
-     * This property does not effect polling clients that poll the server without a server wait.
-     *
-     * @param value The number of milliseconds a client will wait before issuing its next poll when the
-     *        server is configured to wait.
-     */
-    public void setClientWaitInterval(int value)
-    {
-        clientWaitInterval = value;
-    }
-
-    //----------------------------------
-    //  maxWaitingPollRequests
-    //----------------------------------
-
-    protected int maxWaitingPollRequests = 0;
-
-    /**
-     * Retrieves the maximum number of server poll response threads that will be
-     * waiting for messages to arrive for clients.
-     * @return The maximum number of waiting poll requests.
-     */
-    public int getMaxWaitingPollRequests()
-    {
-        return maxWaitingPollRequests;
-    }
-
-    /**
-     * Sets the maximum number of server poll response threads that will be
-     * waiting for messages to arrive for clients.
-     *
-     * @param maxWaitingPollRequests The maximum number of server poll response threads
-     * that will be waiting for messages to arrive for the client.
-     */
-    public void setMaxWaitingPollRequests(int maxWaitingPollRequests)
-    {
-        this.maxWaitingPollRequests = maxWaitingPollRequests;
-        if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval > 0))
-        {
-            waitEnabled = true;
-            canWait = (waitingPollRequestsCount < maxWaitingPollRequests);
-        }
-    }
-
-    //----------------------------------
-    //  pollingEnabled
-    //----------------------------------
-
-    /**
-     * @exclude
-     * This is a property used on the client.
-     */
-    protected boolean piggybackingEnabled;
-
-    //----------------------------------
-    //  pollingEnabled
-    //----------------------------------
-
-    /**
-     * @exclude
-     * This is a property used on the client.
-     */
-    protected boolean pollingEnabled;
-
-    //----------------------------------
-    //  pollingIntervalMillis
-    //----------------------------------
-
-    /**
-     * @exclude
-     * This is a property used on the client.
-     */
-    protected long pollingIntervalMillis = -1;
-
-    //----------------------------------
-    //  waitInterval
-    //----------------------------------
-
-    protected long waitInterval = 0;
-
-    /**
-     * Retrieves the number of milliseconds the server poll response thread will be
-     * waiting for messages to arrive for the client.
-     * @return The wait interval.
-     */
-    public long getWaitInterval()
-    {
-        return waitInterval;
-    }
-
-    /**
-     * Sets the number of milliseconds the server poll response thread will be
-     * waiting for messages to arrive for the client.
-     *
-     * @param waitInterval The number of milliseconds the server poll response thread will be
-     * waiting for messages to arrive for the client.
-     */
-    public void setWaitInterval(long waitInterval)
-    {
-        this.waitInterval = waitInterval;
-        if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval > 0))
-        {
-            waitEnabled = true;
-            canWait = (waitingPollRequestsCount < maxWaitingPollRequests);
-        }
-    }
-
-    //----------------------------------
-    //  waitingPollRequestsCount
-    //----------------------------------
-
-    /**
-     * Retrieves the count of the number of request threads that are currently in the wait state
-     * (including those on their way into or out of it).
-     *
-     * @return The count of the number of request threads that are currently in the wait state.
-     */
-    public int getWaitingPollRequestsCount()
-    {
-        return waitingPollRequestsCount;
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Public Methods
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * @exclude
-     * Returns a <code>ConfigMap</code> of endpoint properties that the client
-     * needs. This includes properties from <code>super.describeEndpoint</code>
-     * and additional <code>BaseHTTPEndpoint</code> specific properties under
-     * "properties" key.
-     */
-    @Override
-    public ConfigMap describeEndpoint()
-    {
-        ConfigMap endpointConfig = super.describeEndpoint();
-
-        boolean createdProperties = false;
-        ConfigMap properties = endpointConfig.getPropertyAsMap(PROPERTIES_ELEMENT, null);
-        if (properties == null)
-        {
-            properties = new ConfigMap();
-            createdProperties = true;
-        }
-
-        if (pollingEnabled)
-        {
-            ConfigMap pollingEnabled = new ConfigMap();
-            // Adding as a value rather than attribute to the parent
-            pollingEnabled.addProperty(EMPTY_STRING, TRUE_STRING);
-            properties.addProperty(POLLING_ENABLED, pollingEnabled);
-        }
-
-        if (pollingIntervalMillis > -1)
-        {
-            ConfigMap pollingInterval = new ConfigMap();
-            // Adding as a value rather than attribute to the parent
-            pollingInterval.addProperty(EMPTY_STRING, String.valueOf(pollingIntervalMillis));
-            properties.addProperty(POLLING_INTERVAL_MILLIS, pollingInterval);
-        }
-
-        if (piggybackingEnabled)
-        {
-            ConfigMap piggybackingEnabled = new ConfigMap();
-            // Adding as a value rather than attribute to the parent
-            piggybackingEnabled.addProperty(EMPTY_STRING, String.valueOf(piggybackingEnabled));
-            properties.addProperty(ConfigurationConstants.PIGGYBACKING_ENABLED_ELEMENT, piggybackingEnabled);
-        }
-
-        if (createdProperties && properties.size() > 0)
-            endpointConfig.addProperty(ConfigurationConstants.PROPERTIES_ELEMENT, properties);
-
-        return endpointConfig;
-    }
-
-    /**
-     * Sets up monitoring of waited poll requests so they can be notified and exit when the
-     * endpoint stops.
-     *
-     * @see flex.messaging.endpoints.AbstractEndpoint#start()
-     */
-    @Override
-    public void start()
-    {
-        if (isStarted())
-            return;
-
-        super.start();
-
-        currentWaitedRequests = new ConcurrentHashMap();
-    }
-
-    /**
-     * Ensures that no poll requests in a wait state are left un-notified when the endpoint stops.
-     *
-     * @see flex.messaging.endpoints.AbstractEndpoint#stop()
-     */
-    @Override
-    public void stop()
-    {
-        if (!isStarted())
-            return;
-
-        // Notify any currently waiting polls.
-        for (Object notifier : currentWaitedRequests.keySet())
-        {
-            synchronized (notifier)
-            {
-                notifier.notifyAll(); // Break any current waits.
-            }
-        }
-        currentWaitedRequests = null;
-
-        super.stop();
-    }
-
-    /**
-     * (non-Javaodc)
-     * @see flex.messaging.client.PollWaitListener#waitStart(Object)
-     */
-    public void waitStart(Object notifier)
-    {
-        currentWaitedRequests.put(notifier, Boolean.TRUE);
-    }
-
-    /**
-     * (non-Javaodc)
-     * @see flex.messaging.client.PollWaitListener#waitEnd(Object)
-     */
-    public void waitEnd(Object notifier)
-    {
-        if (currentWaitedRequests != null)
-            currentWaitedRequests.remove(notifier);
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Protected Methods
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Overrides the base poll handling to support optionally putting Http request handling threads
-     * into a wait state until messages are available to be delivered in the poll response or a timeout is reached.
-     * The number of threads that may be put in a wait state is bounded by <code>max-waiting-poll-requests</code>
-     * and waits will only be attempted if the canWait flag that is based on the <code>max-waiting-poll-requests</code>
-     * and the specified <code>wait-interval</code> is true.
-     *
-     * @param flexClient The FlexClient that issued the poll request.
-     * @param pollCommand The poll command from the client.
-     * @return The flush info used to build the poll response.
-     */
-    @Override
-    protected FlushResult handleFlexClientPoll(FlexClient flexClient, CommandMessage pollCommand)
-    {
-        FlushResult flushResult = null;
-        if (canWait && !pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER))
-        {
-            FlexSession session = FlexContext.getFlexSession();
-            // If canWait is true it means we currently have less than the max number of allowed waiting threads.
-
-            // We need to protect writes/reads to the wait count with the endpoint's lock.
-            // Also, we have to be careful to handle the case where two threads get to this point when only
-            // one wait spot remains; one thread will win and the other needs to revert to a non-waitable poll.
-            boolean thisThreadCanWait;
-            synchronized (lock)
-            {
-                ++waitingPollRequestsCount;
-                if (waitingPollRequestsCount == maxWaitingPollRequests)
-                {
-                    thisThreadCanWait = true; // This thread got the last wait spot.
-                    canWait = false;
-                }
-                else if (waitingPollRequestsCount > maxWaitingPollRequests)
-                {
-                    thisThreadCanWait = false; // This thread was beaten out for the last spot.
-                    --waitingPollRequestsCount; // Decrement the count because we're not going to try a poll with wait.
-                    canWait = false; // All the wait spots are currently occupied so prevent further attempts for now.
-                }
-                else
-                {
-                    // We haven't hit the limit yet, allow this thread to wait.
-                    thisThreadCanWait = true;
-                }
-            }
-
-            // Check the max waiting connections per session count
-            if (thisThreadCanWait)
-            {
-                String userAgentValue = FlexContext.getHttpRequest().getHeader(UserAgentManager.USER_AGENT_HEADER_NAME);
-                UserAgentSettings agentSettings = userAgentManager.match(userAgentValue);
-                synchronized(session)
-                {
-                    if (agentSettings != null)
-                        session.maxConnectionsPerSession = agentSettings.getMaxPersistentConnectionsPerSession();
-
-                    ++session.streamingConnectionsCount;
-                    if (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED
-                            || session.streamingConnectionsCount <= session.maxConnectionsPerSession)
-                    {
-                        thisThreadCanWait = true; // We haven't hit the limit yet, allow the wait.
-                    }
-                    else // (session.streamingConnectionsCount > session.maxConnectionsPerSession)
-                    {
-                        thisThreadCanWait = false; // no more from this client
-                        --session.streamingConnectionsCount;
-                    }
-                }
-
-                if (!thisThreadCanWait)
-                {
-                    // Decrement the waiting poll count, since this poll isn't going to wait.
-                    synchronized (lock)
-                    {
-                        --waitingPollRequestsCount;
-                        if (waitingPollRequestsCount < maxWaitingPollRequests)
-                            canWait = true;
-                    }
-                    if (Log.isDebug())
-                    {
-                        log.debug("Max long-polling requests per session limit (" + session.maxConnectionsPerSession + ") has been reached, this poll won't wait.");
-                    }
-                }
-
-            }
-
-            if (thisThreadCanWait)
-            {
-                if (Log.isDebug())
-                    log.debug("Number of waiting threads for endpoint with id '"+ getId() +"' is " + waitingPollRequestsCount + ".");
-
-                try
-                {
-                    flushResult  = flexClient.pollWithWait(getId(), FlexContext.getFlexSession(), this, waitInterval);
-                    if (flushResult != null)
-                    {
-                        // Prevent busy-polling due to multiple clients sharing a session and swapping each other out too quickly.
-                        if ((flushResult instanceof PollFlushResult) && ((PollFlushResult)flushResult).isAvoidBusyPolling() && (flushResult.getNextFlushWaitTimeMillis() < DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS))
-                        {
-                            // Force the client polling interval to match the default defined in the client PollingChannel.
-                            flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS);
-                        }
-                        else if ((clientWaitInterval > 0) && (flushResult.getNextFlushWaitTimeMillis() == 0))
-                        {
-                            // If the FlushResult doesn't specify it's own flush wait time, use the configured clientWaitInterval if defined.
-                            flushResult.setNextFlushWaitTimeMillis(clientWaitInterval);
-                        }
-                    }
-                }
-                finally
-                {
-                    // We're done waiting so decrement the count of waiting threads and update the canWait flag if necessary
-                    synchronized (lock)
-                    {
-                        --waitingPollRequestsCount;
-                        if (waitingPollRequestsCount < maxWaitingPollRequests)
-                            canWait = true;
-                    }
-                    synchronized (session)
-                    {
-                        --session.streamingConnectionsCount;
-                    }
-
-                    if (Log.isDebug())
-                        log.debug("Number of waiting threads for endpoint with id '"+ getId() +"' is " + waitingPollRequestsCount + ".");
-                }
-            }
-        }
-        else if (Log.isDebug() && waitEnabled)
-        {
-            if (pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER))
-                log.debug("Suppressing poll wait for this request because it is part of a batch of messages to process.");
-            else
-                log.debug("Max waiting poll requests limit '" + maxWaitingPollRequests + "' has been reached for endpoint '" + getId() + "'. FlexClient with id '"+ flexClient.getId() + "' will poll with no wait.");
-        }
-
-        // If we weren't able to do a poll with wait above for any reason just run the base poll handling logic.
-        if (flushResult == null)
-        {
-            flushResult = super.handleFlexClientPoll(flexClient, pollCommand);
-            // If this is an excess poll request that we couldn't wait on, make sure the client doesn't poll the endpoint too aggressively.
-            // In this case, force a client wait to match the default polling interval defined in the client PollingChannel.
-            if ( waitEnabled && (pollingIntervalMillis < DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS))
-            {
-                if (flushResult == null)
-                {
-                    flushResult = new FlushResult();
-                }
-                flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS);
-            }
-        }
-
-        return flushResult;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.endpoints;
+
+import flex.messaging.FlexContext;
+import flex.messaging.FlexSession;
+import flex.messaging.client.FlexClient;
+import flex.messaging.client.FlushResult;
+import flex.messaging.client.PollFlushResult;
+import flex.messaging.client.PollWaitListener;
+import flex.messaging.client.UserAgentSettings;
+import flex.messaging.config.ConfigMap;
+import flex.messaging.config.ConfigurationConstants;
+import flex.messaging.log.Log;
+import flex.messaging.messages.CommandMessage;
+import flex.messaging.util.UserAgentManager;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Base class for HTTP-based endpoints that support regular polling and long polling,
+ * which means placing request threads that are polling for messages into a wait
+ * state until messages are available for delivery or the configurable wait interval
+ * is reached.
+ */
+public abstract class BasePollingHTTPEndpoint extends BaseHTTPEndpoint implements PollWaitListener
+{
+    //--------------------------------------------------------------------------
+    //
+    // Private Static Constants
+    //
+    //--------------------------------------------------------------------------
+
+    private static final String POLLING_ENABLED = "polling-enabled";
+    private static final String POLLING_INTERVAL_MILLIS = "polling-interval-millis";
+    private static final String POLLING_INTERVAL_SECONDS = "polling-interval-seconds"; // Deprecated configuration option.
+    private static final String MAX_WAITING_POLL_REQUESTS = "max-waiting-poll-requests";
+    private static final String WAIT_INTERVAL_MILLIS = "wait-interval-millis";
+    private static final String CLIENT_WAIT_INTERVAL_MILLIS = "client-wait-interval-millis";
+    // Force clients that exceed the long-poll limit to wait at least this long between poll requests.
+    // This matches the default polling interval defined in the client PollingChannel.
+    private static final int DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS = 3000;
+
+    // User Agent based settings manager
+    private UserAgentManager userAgentManager = new UserAgentManager();
+
+    //--------------------------------------------------------------------------
+    //
+    // Constructor
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Constructs an unmanaged <code>BasePollingHTTPEndpoint</code>.
+     */
+    public BasePollingHTTPEndpoint()
+    {
+        this(false);
+    }
+
+    /**
+     * Constructs an <code>BasePollingHTTPEndpoint</code> with the indicated management.
+     *
+     * @param enableManagement <code>true</code> if the <code>BasePollingHTTPEndpoint</code>
+     * is manageable; <code>false</code> otherwise.
+     */
+    public BasePollingHTTPEndpoint(boolean enableManagement)
+    {
+        super(enableManagement);
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Initialize, validate, start, and stop methods.
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Initializes the <code>Endpoint</code> with the properties.
+     * If subclasses override this method, they must call <code>super.initialize()</code>.
+     *
+     * @param id The ID of the <code>Endpoint</code>.
+     * @param properties Properties for the <code>Endpoint</code>.
+     */
+    @Override
+    public void initialize(String id, ConfigMap properties)
+    {
+        super.initialize(id, properties);
+
+        if (properties == null || properties.size() == 0)
+        {
+            // Initialize default user agent manager settings.
+            UserAgentManager.setupUserAgentManager(null, userAgentManager);
+
+            return; // Nothing else to initialize.
+        }
+
+        // General poll props.
+        pollingEnabled = properties.getPropertyAsBoolean(POLLING_ENABLED, false);
+        pollingIntervalMillis = properties.getPropertyAsLong(POLLING_INTERVAL_MILLIS, -1);
+        long pollingIntervalSeconds = properties.getPropertyAsLong(POLLING_INTERVAL_SECONDS, -1); // Deprecated
+        if (pollingIntervalSeconds > -1)
+            pollingIntervalMillis = pollingIntervalSeconds * 1000;
+
+        // Piggybacking props.
+        piggybackingEnabled = properties.getPropertyAsBoolean(ConfigurationConstants.PIGGYBACKING_ENABLED_ELEMENT, false);
+
+        // HTTP poll wait props.
+        maxWaitingPollRequests = properties.getPropertyAsInt(MAX_WAITING_POLL_REQUESTS, 0);
+        waitInterval = properties.getPropertyAsLong(WAIT_INTERVAL_MILLIS, 0);
+        clientWaitInterval = properties.getPropertyAsInt(CLIENT_WAIT_INTERVAL_MILLIS, 0);
+
+        // User Agent props.
+        UserAgentManager.setupUserAgentManager(properties, userAgentManager);
+
+        // Set initial state for the canWait flag based on whether we allow waits or not.
+        if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval > 0))
+        {
+            waitEnabled = true;
+            canWait = true;
+        }
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Variables
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * This flag is volatile to allow for consistent reads across thread without
+     * needing to pay the cost for a synchronized lock for each read.
+     */
+    private volatile boolean canWait;
+
+    /**
+     * Used to synchronize sets and gets to the number of waiting clients.
+     */
+    protected final Object lock = new Object();
+
+    /**
+     * Set when properties are handled; used as a shortcut for logging to determine whether this
+     * instance attempts to put request threads in a wait state or not.
+     */
+    private boolean waitEnabled;
+
+    /**
+     * A count of the number of request threads that are currently in the wait state (including
+     * those on their way into or out of it).
+     */
+    protected int waitingPollRequestsCount;
+
+    /**
+     * A Map(notification Object for a waited request thread, Boolean.TRUE).
+     */
+    private ConcurrentHashMap currentWaitedRequests;
+
+    //--------------------------------------------------------------------------
+    //
+    // Properties
+    //
+    //--------------------------------------------------------------------------
+
+    //----------------------------------
+    //  clientWaitInterval
+    //----------------------------------
+
+    protected int clientWaitInterval = 0;
+
+    /**
+     * Retrieves the number of milliseconds the client will wait after receiving a response for
+     * a poll with server wait before it issues its next poll request.
+     * A value of zero or less causes the client to use its default polling interval (based on the
+     * channel's polling-interval-millis configuration) and this value is ignored.
+     * A value greater than zero will cause the client to wait for the specified interval before
+     * issuing its next poll request with a value of 1 triggering an immediate poll from the client
+     * as soon as a waited poll response is received.
+     * @return The client wait interval.
+     */
+    public int getClientWaitInterval()
+    {
+        return clientWaitInterval;
+    }
+
+    /**
+     * Sets the number of milliseconds a client will wait after receiving a response for a poll
+     * with server wait before it issues its next poll request.
+     * A value of zero or less causes the client to use its default polling interval (based on the
+     * channel's polling-interval-millis configuration) and this value is ignored.
+     * A value greater than zero will cause the client to wait for the specified interval before
+     * issuing its next poll request with a value of 1 triggering an immediate poll from the client
+     * as soon as a waited poll response is received.
+     * This property does not effect polling clients that poll the server without a server wait.
+     *
+     * @param value The number of milliseconds a client will wait before issuing its next poll when the
+     *        server is configured to wait.
+     */
+    public void setClientWaitInterval(int value)
+    {
+        clientWaitInterval = value;
+    }
+
+    //----------------------------------
+    //  maxWaitingPollRequests
+    //----------------------------------
+
+    protected int maxWaitingPollRequests = 0;
+
+    /**
+     * Retrieves the maximum number of server poll response threads that will be
+     * waiting for messages to arrive for clients.
+     * @return The maximum number of waiting poll requests.
+     */
+    public int getMaxWaitingPollRequests()
+    {
+        return maxWaitingPollRequests;
+    }
+
+    /**
+     * Sets the maximum number of server poll response threads that will be
+     * waiting for messages to arrive for clients.
+     *
+     * @param maxWaitingPollRequests The maximum number of server poll response threads
+     * that will be waiting for messages to arrive for the client.
+     */
+    public void setMaxWaitingPollRequests(int maxWaitingPollRequests)
+    {
+        this.maxWaitingPollRequests = maxWaitingPollRequests;
+        if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval > 0))
+        {
+            waitEnabled = true;
+            canWait = (waitingPollRequestsCount < maxWaitingPollRequests);
+        }
+    }
+
+    //----------------------------------
+    //  pollingEnabled
+    //----------------------------------
+
+    /**
+     * @exclude
+     * This is a property used on the client.
+     */
+    protected boolean piggybackingEnabled;
+
+    //----------------------------------
+    //  pollingEnabled
+    //----------------------------------
+
+    /**
+     * @exclude
+     * This is a property used on the client.
+     */
+    protected boolean pollingEnabled;
+
+    //----------------------------------
+    //  pollingIntervalMillis
+    //----------------------------------
+
+    /**
+     * @exclude
+     * This is a property used on the client.
+     */
+    protected long pollingIntervalMillis = -1;
+
+    //----------------------------------
+    //  waitInterval
+    //----------------------------------
+
+    protected long waitInterval = 0;
+
+    /**
+     * Retrieves the number of milliseconds the server poll response thread will be
+     * waiting for messages to arrive for the client.
+     * @return The wait interval.
+     */
+    public long getWaitInterval()
+    {
+        return waitInterval;
+    }
+
+    /**
+     * Sets the number of milliseconds the server poll response thread will be
+     * waiting for messages to arrive for the client.
+     *
+     * @param waitInterval The number of milliseconds the server poll response thread will be
+     * waiting for messages to arrive for the client.
+     */
+    public void setWaitInterval(long waitInterval)
+    {
+        this.waitInterval = waitInterval;
+        if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval > 0))
+        {
+            waitEnabled = true;
+            canWait = (waitingPollRequestsCount < maxWaitingPollRequests);
+        }
+    }
+
+    //----------------------------------
+    //  waitingPollRequestsCount
+    //----------------------------------
+
+    /**
+     * Retrieves the count of the number of request threads that are currently in the wait state
+     * (including those on their way into or out of it).
+     *
+     * @return The count of the number of request threads that are currently in the wait state.
+     */
+    public int getWaitingPollRequestsCount()
+    {
+        return waitingPollRequestsCount;
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Public Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * @exclude
+     * Returns a <code>ConfigMap</code> of endpoint properties that the client
+     * needs. This includes properties from <code>super.describeEndpoint</code>
+     * and additional <code>BaseHTTPEndpoint</code> specific properties under
+     * "properties" key.
+     */
+    @Override
+    public ConfigMap describeEndpoint()
+    {
+        ConfigMap endpointConfig = super.describeEndpoint();
+
+        boolean createdProperties = false;
+        ConfigMap properties = endpointConfig.getPropertyAsMap(PROPERTIES_ELEMENT, null);
+        if (properties == null)
+        {
+            properties = new ConfigMap();
+            createdProperties = true;
+        }
+
+        if (pollingEnabled)
+        {
+            ConfigMap pollingEnabled = new ConfigMap();
+            // Adding as a value rather than attribute to the parent
+            pollingEnabled.addProperty(EMPTY_STRING, TRUE_STRING);
+            properties.addProperty(POLLING_ENABLED, pollingEnabled);
+        }
+
+        if (pollingIntervalMillis > -1)
+        {
+            ConfigMap pollingInterval = new ConfigMap();
+            // Adding as a value rather than attribute to the parent
+            pollingInterval.addProperty(EMPTY_STRING, String.valueOf(pollingIntervalMillis));
+            properties.addProperty(POLLING_INTERVAL_MILLIS, pollingInterval);
+        }
+
+        if (piggybackingEnabled)
+        {
+            ConfigMap piggybackingEnabled = new ConfigMap();
+            // Adding as a value rather than attribute to the parent
+            piggybackingEnabled.addProperty(EMPTY_STRING, String.valueOf(piggybackingEnabled));
+            properties.addProperty(ConfigurationConstants.PIGGYBACKING_ENABLED_ELEMENT, piggybackingEnabled);
+        }
+
+        if (createdProperties && properties.size() > 0)
+            endpointConfig.addProperty(ConfigurationConstants.PROPERTIES_ELEMENT, properties);
+
+        return endpointConfig;
+    }
+
+    /**
+     * Sets up monitoring of waited poll requests so they can be notified and exit when the
+     * endpoint stops.
+     *
+     * @see flex.messaging.endpoints.AbstractEndpoint#start()
+     */
+    @Override
+    public void start()
+    {
+        if (isStarted())
+            return;
+
+        super.start();
+
+        currentWaitedRequests = new ConcurrentHashMap();
+    }
+
+    /**
+     * Ensures that no poll requests in a wait state are left un-notified when the endpoint stops.
+     *
+     * @see flex.messaging.endpoints.AbstractEndpoint#stop()
+     */
+    @Override
+    public void stop()
+    {
+        if (!isStarted())
+            return;
+
+        // Notify any currently waiting polls.
+        for (Object notifier : currentWaitedRequests.keySet())
+        {
+            synchronized (notifier)
+            {
+                notifier.notifyAll(); // Break any current waits.
+            }
+        }
+        currentWaitedRequests = null;
+
+        super.stop();
+    }
+
+    /**
+     * (non-Javaodc)
+     * @see flex.messaging.client.PollWaitListener#waitStart(Object)
+     */
+    public void waitStart(Object notifier)
+    {
+        currentWaitedRequests.put(notifier, Boolean.TRUE);
+    }
+
+    /**
+     * (non-Javaodc)
+     * @see flex.messaging.client.PollWaitListener#waitEnd(Object)
+     */
+    public void waitEnd(Object notifier)
+    {
+        if (currentWaitedRequests != null)
+            currentWaitedRequests.remove(notifier);
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Protected Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Overrides the base poll handling to support optionally putting Http request handling threads
+     * into a wait state until messages are available to be delivered in the poll response or a timeout is reached.
+     * The number of threads that may be put in a wait state is bounded by <code>max-waiting-poll-requests</code>
+     * and waits will only be attempted if the canWait flag that is based on the <code>max-waiting-poll-requests</code>
+     * and the specified <code>wait-interval</code> is true.
+     *
+     * @param flexClient The FlexClient that issued the poll request.
+     * @param pollCommand The poll command from the client.
+     * @return The flush info used to build the poll response.
+     */
+    @Override
+    protected FlushResult handleFlexClientPoll(FlexClient flexClient, CommandMessage pollCommand)
+    {
+        FlushResult flushResult = null;
+        if (canWait && !pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER))
+        {
+            FlexSession session = FlexContext.getFlexSession();
+            // If canWait is true it means we currently have less than the max number of allowed waiting threads.
+
+            // We need to protect writes/reads to the wait count with the endpoint's lock.
+            // Also, we have to be careful to handle the case where two threads get to this point when only
+            // one wait spot remains; one thread will win and the other needs to revert to a non-waitable poll.
+            boolean thisThreadCanWait;
+            synchronized (lock)
+            {
+                ++waitingPollRequestsCount;
+                if (waitingPollRequestsCount == maxWaitingPollRequests)
+                {
+                    thisThreadCanWait = true; // This thread got the last wait spot.
+                    canWait = false;
+                }
+                else if (waitingPollRequestsCount > maxWaitingPollRequests)
+                {
+                    thisThreadCanWait = false; // This thread was beaten out for the last spot.
+                    --waitingPollRequestsCount; // Decrement the count because we're not going to try a poll with wait.
+                    canWait = false; // All the wait spots are currently occupied so prevent further attempts for now.
+                }
+                else
+                {
+                    // We haven't hit the limit yet, allow this thread to wait.
+                    thisThreadCanWait = true;
+                }
+            }
+
+            // Check the max waiting connections per session count
+            if (thisThreadCanWait)
+            {
+                String userAgentValue = FlexContext.getHttpRequest().getHeader(UserAgentManager.USER_AGENT_HEADER_NAME);
+                UserAgentSettings agentSettings = userAgentManager.match(userAgentValue);
+                synchronized(session)
+                {
+                    if (agentSettings != null)
+                        session.maxConnectionsPerSession = agentSettings.getMaxPersistentConnectionsPerSession();
+
+                    ++session.streamingConnectionsCount;
+                    if (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED
+                            || session.streamingConnectionsCount <= session.maxConnectionsPerSession)
+                    {
+                        thisThreadCanWait = true; // We haven't hit the limit yet, allow the wait.
+                    }
+                    else // (session.streamingConnectionsCount > session.maxConnectionsPerSession)
+                    {
+                        thisThreadCanWait = false; // no more from this client
+                        --session.streamingConnectionsCount;
+                    }
+                }
+
+                if (!thisThreadCanWait)
+                {
+                    // Decrement the waiting poll count, since this poll isn't going to wait.
+                    synchronized (lock)
+                    {
+                        --waitingPollRequestsCount;
+                        if (waitingPollRequestsCount < maxWaitingPollRequests)
+                            canWait = true;
+                    }
+                    if (Log.isDebug())
+                    {
+                        log.debug("Max long-polling requests per session limit (" + session.maxConnectionsPerSession + ") has been reached, this poll won't wait.");
+                    }
+                }
+
+            }
+
+            if (thisThreadCanWait)
+            {
+                if (Log.isDebug())
+                    log.debug("Number of waiting threads for endpoint with id '"+ getId() +"' is " + waitingPollRequestsCount + ".");
+
+                try
+                {
+                    flushResult  = flexClient.pollWithWait(getId(), FlexContext.getFlexSession(), this, waitInterval);
+                    if (flushResult != null)
+                    {
+                        // Prevent busy-polling due to multiple clients sharing a session and swapping each other out too quickly.
+                        if ((flushResult instanceof PollFlushResult) && ((PollFlushResult)flushResult).isAvoidBusyPolling() && (flushResult.getNextFlushWaitTimeMillis() < DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS))
+                        {
+                            // Force the client polling interval to match the default defined in the client PollingChannel.
+                            flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS);
+                        }
+                        else if ((clientWaitInterval > 0) && (flushResult.getNextFlushWaitTimeMillis() == 0))
+                        {
+                            // If the FlushResult doesn't specify it's own flush wait time, use the configured clientWaitInterval if defined.
+                            flushResult.setNextFlushWaitTimeMillis(clientWaitInterval);
+                        }
+                    }
+                }
+                finally
+                {
+                    // We're done waiting so decrement the count of waiting threads and update the canWait flag if necessary
+                    synchronized (lock)
+                    {
+                        --waitingPollRequestsCount;
+                        if (waitingPollRequestsCount < maxWaitingPollRequests)
+                            canWait = true;
+                    }
+                    synchronized (session)
+                    {
+                        --session.streamingConnectionsCount;
+                    }
+
+                    if (Log.isDebug())
+                        log.debug("Number of waiting threads for endpoint with id '"+ getId() +"' is " + waitingPollRequestsCount + ".");
+                }
+            }
+        }
+        else if (Log.isDebug() && waitEnabled)
+        {
+            if (pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER))
+                log.debug("Suppressing poll wait for this request because it is part of a batch of messages to process.");
+            else
+                log.debug("Max waiting poll requests limit '" + maxWaitingPollRequests + "' has been reached for endpoint '" + getId() + "'. FlexClient with id '"+ flexClient.getId() + "' will poll with no wait.");
+        }
+
+        // If we weren't able to do a poll with wait above for any reason just run the base poll handling logic.
+        if (flushResult == null)
+        {
+            flushResult = super.handleFlexClientPoll(flexClient, pollCommand);
+            // If this is an excess poll request that we couldn't wait on, make sure the client doesn't poll the endpoint too aggressively.
+            // In this case, force a client wait to match the default polling interval defined in the client PollingChannel.
+            if ( waitEnabled && (pollingIntervalMillis < DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS))
+            {
+                if (flushResult == null)
+                {
+                    flushResult = new FlushResult();
+                }
+                flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS);
+            }
+        }
+
+        return flushResult;
+    }
+}


Mime
View raw message