flex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cd...@apache.org
Subject [01/51] [partial] flex-blazeds git commit: Removed legacy directories and made the content of the modules directory the new root - Please use the maven build for now as the Ant build will no longer work untill it is adjusted to the new directory structur
Date Sun, 20 Dec 2015 13:13:41 GMT
Repository: flex-blazeds
Updated Branches:
  refs/heads/develop e1ad55788 -> bf2e1dc9b


http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/bf2e1dc9/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java
----------------------------------------------------------------------
diff --git a/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java b/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java
new file mode 100644
index 0000000..3c29fae
--- /dev/null
+++ b/core/src/flex/messaging/endpoints/BasePollingHTTPEndpoint.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.endpoints;
+
+import flex.messaging.FlexContext;
+import flex.messaging.FlexSession;
+import flex.messaging.client.FlexClient;
+import flex.messaging.client.FlushResult;
+import flex.messaging.client.PollFlushResult;
+import flex.messaging.client.PollWaitListener;
+import flex.messaging.client.UserAgentSettings;
+import flex.messaging.config.ConfigMap;
+import flex.messaging.config.ConfigurationConstants;
+import flex.messaging.log.Log;
+import flex.messaging.messages.CommandMessage;
+import flex.messaging.util.UserAgentManager;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Base class for HTTP-based endpoints that support regular polling and long polling,
+ * which means placing request threads that are polling for messages into a wait
+ * state until messages are available for delivery or the configurable wait interval
+ * is reached.
+ */
+public abstract class BasePollingHTTPEndpoint extends BaseHTTPEndpoint implements PollWaitListener
+{
+    //--------------------------------------------------------------------------
+    //
+    // Private Static Constants
+    //
+    //--------------------------------------------------------------------------
+
+    private static final String POLLING_ENABLED = "polling-enabled";
+    private static final String POLLING_INTERVAL_MILLIS = "polling-interval-millis";
+    private static final String POLLING_INTERVAL_SECONDS = "polling-interval-seconds"; //
Deprecated configuration option.
+    private static final String MAX_WAITING_POLL_REQUESTS = "max-waiting-poll-requests";
+    private static final String WAIT_INTERVAL_MILLIS = "wait-interval-millis";
+    private static final String CLIENT_WAIT_INTERVAL_MILLIS = "client-wait-interval-millis";
+    // Force clients that exceed the long-poll limit to wait at least this long between poll
requests.
+    // This matches the default polling interval defined in the client PollingChannel.
+    private static final int DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS = 3000;
+
+    // User Agent based settings manager
+    private UserAgentManager userAgentManager = new UserAgentManager();
+
+    //--------------------------------------------------------------------------
+    //
+    // Constructor
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Constructs an unmanaged <code>BasePollingHTTPEndpoint</code>.
+     */
+    public BasePollingHTTPEndpoint()
+    {
+        this(false);
+    }
+
+    /**
+     * Constructs an <code>BasePollingHTTPEndpoint</code> with the indicated
management.
+     *
+     * @param enableManagement <code>true</code> if the <code>BasePollingHTTPEndpoint</code>
+     * is manageable; <code>false</code> otherwise.
+     */
+    public BasePollingHTTPEndpoint(boolean enableManagement)
+    {
+        super(enableManagement);
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Initialize, validate, start, and stop methods.
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Initializes the <code>Endpoint</code> with the properties.
+     * If subclasses override this method, they must call <code>super.initialize()</code>.
+     *
+     * @param id The ID of the <code>Endpoint</code>.
+     * @param properties Properties for the <code>Endpoint</code>.
+     */
+    @Override
+    public void initialize(String id, ConfigMap properties)
+    {
+        super.initialize(id, properties);
+
+        if (properties == null || properties.size() == 0)
+        {
+            // Initialize default user agent manager settings.
+            UserAgentManager.setupUserAgentManager(null, userAgentManager);
+
+            return; // Nothing else to initialize.
+        }
+
+        // General poll props.
+        pollingEnabled = properties.getPropertyAsBoolean(POLLING_ENABLED, false);
+        pollingIntervalMillis = properties.getPropertyAsLong(POLLING_INTERVAL_MILLIS, -1);
+        long pollingIntervalSeconds = properties.getPropertyAsLong(POLLING_INTERVAL_SECONDS,
-1); // Deprecated
+        if (pollingIntervalSeconds > -1)
+            pollingIntervalMillis = pollingIntervalSeconds * 1000;
+
+        // Piggybacking props.
+        piggybackingEnabled = properties.getPropertyAsBoolean(ConfigurationConstants.PIGGYBACKING_ENABLED_ELEMENT,
false);
+
+        // HTTP poll wait props.
+        maxWaitingPollRequests = properties.getPropertyAsInt(MAX_WAITING_POLL_REQUESTS, 0);
+        waitInterval = properties.getPropertyAsLong(WAIT_INTERVAL_MILLIS, 0);
+        clientWaitInterval = properties.getPropertyAsInt(CLIENT_WAIT_INTERVAL_MILLIS, 0);
+
+        // User Agent props.
+        UserAgentManager.setupUserAgentManager(properties, userAgentManager);
+
+        // Set initial state for the canWait flag based on whether we allow waits or not.
+        if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval
> 0))
+        {
+            waitEnabled = true;
+            canWait = true;
+        }
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Variables
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * This flag is volatile to allow for consistent reads across thread without
+     * needing to pay the cost for a synchronized lock for each read.
+     */
+    private volatile boolean canWait;
+
+    /**
+     * Used to synchronize sets and gets to the number of waiting clients.
+     */
+    protected final Object lock = new Object();
+
+    /**
+     * Set when properties are handled; used as a shortcut for logging to determine whether
this
+     * instance attempts to put request threads in a wait state or not.
+     */
+    private boolean waitEnabled;
+
+    /**
+     * A count of the number of request threads that are currently in the wait state (including
+     * those on their way into or out of it).
+     */
+    protected int waitingPollRequestsCount;
+
+    /**
+     * A Map(notification Object for a waited request thread, Boolean.TRUE).
+     */
+    private ConcurrentHashMap currentWaitedRequests;
+
+    //--------------------------------------------------------------------------
+    //
+    // Properties
+    //
+    //--------------------------------------------------------------------------
+
+    //----------------------------------
+    //  clientWaitInterval
+    //----------------------------------
+
+    protected int clientWaitInterval = 0;
+
+    /**
+     * Retrieves the number of milliseconds the client will wait after receiving a response
for
+     * a poll with server wait before it issues its next poll request.
+     * A value of zero or less causes the client to use its default polling interval (based
on the
+     * channel's polling-interval-millis configuration) and this value is ignored.
+     * A value greater than zero will cause the client to wait for the specified interval
before
+     * issuing its next poll request with a value of 1 triggering an immediate poll from
the client
+     * as soon as a waited poll response is received.
+     * @return The client wait interval.
+     */
+    public int getClientWaitInterval()
+    {
+        return clientWaitInterval;
+    }
+
+    /**
+     * Sets the number of milliseconds a client will wait after receiving a response for
a poll
+     * with server wait before it issues its next poll request.
+     * A value of zero or less causes the client to use its default polling interval (based
on the
+     * channel's polling-interval-millis configuration) and this value is ignored.
+     * A value greater than zero will cause the client to wait for the specified interval
before
+     * issuing its next poll request with a value of 1 triggering an immediate poll from
the client
+     * as soon as a waited poll response is received.
+     * This property does not effect polling clients that poll the server without a server
wait.
+     *
+     * @param value The number of milliseconds a client will wait before issuing its next
poll when the
+     *        server is configured to wait.
+     */
+    public void setClientWaitInterval(int value)
+    {
+        clientWaitInterval = value;
+    }
+
+    //----------------------------------
+    //  maxWaitingPollRequests
+    //----------------------------------
+
+    protected int maxWaitingPollRequests = 0;
+
+    /**
+     * Retrieves the maximum number of server poll response threads that will be
+     * waiting for messages to arrive for clients.
+     * @return The maximum number of waiting poll requests.
+     */
+    public int getMaxWaitingPollRequests()
+    {
+        return maxWaitingPollRequests;
+    }
+
+    /**
+     * Sets the maximum number of server poll response threads that will be
+     * waiting for messages to arrive for clients.
+     *
+     * @param maxWaitingPollRequests The maximum number of server poll response threads
+     * that will be waiting for messages to arrive for the client.
+     */
+    public void setMaxWaitingPollRequests(int maxWaitingPollRequests)
+    {
+        this.maxWaitingPollRequests = maxWaitingPollRequests;
+        if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval
> 0))
+        {
+            waitEnabled = true;
+            canWait = (waitingPollRequestsCount < maxWaitingPollRequests);
+        }
+    }
+
+    //----------------------------------
+    //  pollingEnabled
+    //----------------------------------
+
+    /**
+     *
+     * This is a property used on the client.
+     */
+    protected boolean piggybackingEnabled;
+
+    //----------------------------------
+    //  pollingEnabled
+    //----------------------------------
+
+    /**
+     *
+     * This is a property used on the client.
+     */
+    protected boolean pollingEnabled;
+
+    //----------------------------------
+    //  pollingIntervalMillis
+    //----------------------------------
+
+    /**
+     *
+     * This is a property used on the client.
+     */
+    protected long pollingIntervalMillis = -1;
+
+    //----------------------------------
+    //  waitInterval
+    //----------------------------------
+
+    protected long waitInterval = 0;
+
+    /**
+     * Retrieves the number of milliseconds the server poll response thread will be
+     * waiting for messages to arrive for the client.
+     * @return The wait interval.
+     */
+    public long getWaitInterval()
+    {
+        return waitInterval;
+    }
+
+    /**
+     * Sets the number of milliseconds the server poll response thread will be
+     * waiting for messages to arrive for the client.
+     *
+     * @param waitInterval The number of milliseconds the server poll response thread will
be
+     * waiting for messages to arrive for the client.
+     */
+    public void setWaitInterval(long waitInterval)
+    {
+        this.waitInterval = waitInterval;
+        if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval
> 0))
+        {
+            waitEnabled = true;
+            canWait = (waitingPollRequestsCount < maxWaitingPollRequests);
+        }
+    }
+
+    //----------------------------------
+    //  waitingPollRequestsCount
+    //----------------------------------
+
+    /**
+     * Retrieves the count of the number of request threads that are currently in the wait
state
+     * (including those on their way into or out of it).
+     *
+     * @return The count of the number of request threads that are currently in the wait
state.
+     */
+    public int getWaitingPollRequestsCount()
+    {
+        return waitingPollRequestsCount;
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Public Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     *
+     * Returns a <code>ConfigMap</code> of endpoint properties that the client
+     * needs. This includes properties from <code>super.describeEndpoint</code>
+     * and additional <code>BaseHTTPEndpoint</code> specific properties under
+     * "properties" key.
+     */
+    @Override
+    public ConfigMap describeEndpoint()
+    {
+        ConfigMap endpointConfig = super.describeEndpoint();
+
+        boolean createdProperties = false;
+        ConfigMap properties = endpointConfig.getPropertyAsMap(PROPERTIES_ELEMENT, null);
+        if (properties == null)
+        {
+            properties = new ConfigMap();
+            createdProperties = true;
+        }
+
+        if (pollingEnabled)
+        {
+            ConfigMap pollingEnabled = new ConfigMap();
+            // Adding as a value rather than attribute to the parent
+            pollingEnabled.addProperty(EMPTY_STRING, TRUE_STRING);
+            properties.addProperty(POLLING_ENABLED, pollingEnabled);
+        }
+
+        if (pollingIntervalMillis > -1)
+        {
+            ConfigMap pollingInterval = new ConfigMap();
+            // Adding as a value rather than attribute to the parent
+            pollingInterval.addProperty(EMPTY_STRING, String.valueOf(pollingIntervalMillis));
+            properties.addProperty(POLLING_INTERVAL_MILLIS, pollingInterval);
+        }
+
+        if (piggybackingEnabled)
+        {
+            ConfigMap piggybackingEnabled = new ConfigMap();
+            // Adding as a value rather than attribute to the parent
+            piggybackingEnabled.addProperty(EMPTY_STRING, String.valueOf(piggybackingEnabled));
+            properties.addProperty(ConfigurationConstants.PIGGYBACKING_ENABLED_ELEMENT, piggybackingEnabled);
+        }
+
+        if (createdProperties && properties.size() > 0)
+            endpointConfig.addProperty(ConfigurationConstants.PROPERTIES_ELEMENT, properties);
+
+        return endpointConfig;
+    }
+
+    /**
+     * Sets up monitoring of waited poll requests so they can be notified and exit when the
+     * endpoint stops.
+     *
+     * @see flex.messaging.endpoints.AbstractEndpoint#start()
+     */
+    @Override
+    public void start()
+    {
+        if (isStarted())
+            return;
+
+        super.start();
+
+        currentWaitedRequests = new ConcurrentHashMap();
+    }
+
+    /**
+     * Ensures that no poll requests in a wait state are left un-notified when the endpoint
stops.
+     *
+     * @see flex.messaging.endpoints.AbstractEndpoint#stop()
+     */
+    @Override
+    public void stop()
+    {
+        if (!isStarted())
+            return;
+
+        // Notify any currently waiting polls.
+        for (Object notifier : currentWaitedRequests.keySet())
+        {
+            synchronized (notifier)
+            {
+                notifier.notifyAll(); // Break any current waits.
+            }
+        }
+        currentWaitedRequests = null;
+
+        super.stop();
+    }
+
+    /**
+     * (non-Javaodc)
+     * @see flex.messaging.client.PollWaitListener#waitStart(Object)
+     */
+    public void waitStart(Object notifier)
+    {
+        currentWaitedRequests.put(notifier, Boolean.TRUE);
+    }
+
+    /**
+     * (non-Javaodc)
+     * @see flex.messaging.client.PollWaitListener#waitEnd(Object)
+     */
+    public void waitEnd(Object notifier)
+    {
+        if (currentWaitedRequests != null)
+            currentWaitedRequests.remove(notifier);
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Protected Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Overrides the base poll handling to support optionally putting Http request handling
threads
+     * into a wait state until messages are available to be delivered in the poll response
or a timeout is reached.
+     * The number of threads that may be put in a wait state is bounded by <code>max-waiting-poll-requests</code>
+     * and waits will only be attempted if the canWait flag that is based on the <code>max-waiting-poll-requests</code>
+     * and the specified <code>wait-interval</code> is true.
+     *
+     * @param flexClient The FlexClient that issued the poll request.
+     * @param pollCommand The poll command from the client.
+     * @return The flush info used to build the poll response.
+     */
+    @Override
+    protected FlushResult handleFlexClientPoll(FlexClient flexClient, CommandMessage pollCommand)
+    {
+        FlushResult flushResult = null;
+        if (canWait && !pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER))
+        {
+            FlexSession session = FlexContext.getFlexSession();
+            // If canWait is true it means we currently have less than the max number of
allowed waiting threads.
+
+            // We need to protect writes/reads to the wait count with the endpoint's lock.
+            // Also, we have to be careful to handle the case where two threads get to this
point when only
+            // one wait spot remains; one thread will win and the other needs to revert to
a non-waitable poll.
+            boolean thisThreadCanWait;
+            synchronized (lock)
+            {
+                ++waitingPollRequestsCount;
+                if (waitingPollRequestsCount == maxWaitingPollRequests)
+                {
+                    thisThreadCanWait = true; // This thread got the last wait spot.
+                    canWait = false;
+                }
+                else if (waitingPollRequestsCount > maxWaitingPollRequests)
+                {
+                    thisThreadCanWait = false; // This thread was beaten out for the last
spot.
+                    --waitingPollRequestsCount; // Decrement the count because we're not
going to try a poll with wait.
+                    canWait = false; // All the wait spots are currently occupied so prevent
further attempts for now.
+                }
+                else
+                {
+                    // We haven't hit the limit yet, allow this thread to wait.
+                    thisThreadCanWait = true;
+                }
+            }
+
+            // Check the max waiting connections per session count
+            if (thisThreadCanWait)
+            {
+                String userAgentValue = FlexContext.getHttpRequest().getHeader(UserAgentManager.USER_AGENT_HEADER_NAME);
+                UserAgentSettings agentSettings = userAgentManager.match(userAgentValue);
+                synchronized(session)
+                {
+                    if (agentSettings != null)
+                        session.maxConnectionsPerSession = agentSettings.getMaxPersistentConnectionsPerSession();
+
+                    ++session.streamingConnectionsCount;
+                    if (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED
+                            || session.streamingConnectionsCount <= session.maxConnectionsPerSession)
+                    {
+                        thisThreadCanWait = true; // We haven't hit the limit yet, allow
the wait.
+                    }
+                    else // (session.streamingConnectionsCount > session.maxConnectionsPerSession)
+                    {
+                        thisThreadCanWait = false; // no more from this client
+                        --session.streamingConnectionsCount;
+                    }
+                }
+
+                if (!thisThreadCanWait)
+                {
+                    // Decrement the waiting poll count, since this poll isn't going to wait.
+                    synchronized (lock)
+                    {
+                        --waitingPollRequestsCount;
+                        if (waitingPollRequestsCount < maxWaitingPollRequests)
+                            canWait = true;
+                    }
+                    if (Log.isDebug())
+                    {
+                        log.debug("Max long-polling requests per session limit (" + session.maxConnectionsPerSession
+ ") has been reached, this poll won't wait.");
+                    }
+                }
+
+            }
+
+            if (thisThreadCanWait)
+            {
+                if (Log.isDebug())
+                    log.debug("Number of waiting threads for endpoint with id '"+ getId()
+"' is " + waitingPollRequestsCount + ".");
+
+                try
+                {
+                    flushResult  = flexClient.pollWithWait(getId(), FlexContext.getFlexSession(),
this, waitInterval);
+                    if (flushResult != null)
+                    {
+                        // Prevent busy-polling due to multiple clients sharing a session
and swapping each other out too quickly.
+                        if ((flushResult instanceof PollFlushResult) && ((PollFlushResult)flushResult).isAvoidBusyPolling()
&& (flushResult.getNextFlushWaitTimeMillis() < DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS))
+                        {
+                            // Force the client polling interval to match the default defined
in the client PollingChannel.
+                            flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS);
+                        }
+                        else if ((clientWaitInterval > 0) && (flushResult.getNextFlushWaitTimeMillis()
== 0))
+                        {
+                            // If the FlushResult doesn't specify it's own flush wait time,
use the configured clientWaitInterval if defined.
+                            flushResult.setNextFlushWaitTimeMillis(clientWaitInterval);
+                        }
+                    }
+                }
+                finally
+                {
+                    // We're done waiting so decrement the count of waiting threads and update
the canWait flag if necessary
+                    synchronized (lock)
+                    {
+                        --waitingPollRequestsCount;
+                        if (waitingPollRequestsCount < maxWaitingPollRequests)
+                            canWait = true;
+                    }
+                    synchronized (session)
+                    {
+                        --session.streamingConnectionsCount;
+                    }
+
+                    if (Log.isDebug())
+                        log.debug("Number of waiting threads for endpoint with id '"+ getId()
+"' is " + waitingPollRequestsCount + ".");
+                }
+            }
+        }
+        else if (Log.isDebug() && waitEnabled)
+        {
+            if (pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER))
+                log.debug("Suppressing poll wait for this request because it is part of a
batch of messages to process.");
+            else
+                log.debug("Max waiting poll requests limit '" + maxWaitingPollRequests +
"' has been reached for endpoint '" + getId() + "'. FlexClient with id '"+ flexClient.getId()
+ "' will poll with no wait.");
+        }
+
+        // If we weren't able to do a poll with wait above for any reason just run the base
poll handling logic.
+        if (flushResult == null)
+        {
+            flushResult = super.handleFlexClientPoll(flexClient, pollCommand);
+            // If this is an excess poll request that we couldn't wait on, make sure the
client doesn't poll the endpoint too aggressively.
+            // In this case, force a client wait to match the default polling interval defined
in the client PollingChannel.
+            if ( waitEnabled && (pollingIntervalMillis < DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS))
+            {
+                if (flushResult == null)
+                {
+                    flushResult = new FlushResult();
+                }
+                flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS);
+            }
+        }
+
+        return flushResult;
+    }
+}


Mime
View raw message