flex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cd...@apache.org
Subject [06/51] [partial] FLEX-34306 - [BlazeDS] Make the BlazeDS build run on Windows machines - Added some mkdir commands to the ANT Build.java - Did some fine-tuning to resolve some compile errors
Date Mon, 05 May 2014 20:08:20 GMT
http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/4f6a3052/modules/core/src/flex/messaging/endpoints/StreamingHTTPEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/endpoints/StreamingHTTPEndpoint.java b/modules/core/src/flex/messaging/endpoints/StreamingHTTPEndpoint.java
old mode 100755
new mode 100644
index aadf1ba..a5e1c66
--- a/modules/core/src/flex/messaging/endpoints/StreamingHTTPEndpoint.java
+++ b/modules/core/src/flex/messaging/endpoints/StreamingHTTPEndpoint.java
@@ -1,239 +1,239 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.endpoints;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-import javax.servlet.ServletOutputStream;
-import javax.servlet.http.HttpServletResponse;
-
-import flex.management.runtime.messaging.endpoints.StreamingHTTPEndpointControl;
-import flex.messaging.MessageBroker;
-import flex.messaging.endpoints.amf.AMFFilter;
-import flex.messaging.endpoints.amf.BatchProcessFilter;
-import flex.messaging.endpoints.amf.MessageBrokerFilter;
-import flex.messaging.endpoints.amf.SerializationFilter;
-import flex.messaging.endpoints.amf.SessionFilter;
-import flex.messaging.io.MessageIOConstants;
-import flex.messaging.io.TypeMarshallingContext;
-import flex.messaging.io.amfx.AmfxOutput;
-import flex.messaging.log.Log;
-import flex.messaging.log.LogCategories;
-import flex.messaging.messages.Message;
-
-/**
- * Extension to the HTTPEndpoint to support streaming HTTP connections to connected
- * clients.
- * Each streaming connection managed by this endpoint consumes one of the request
- * handler threads provided by the servlet container, so it is not highly scalable
- * but offers performance advantages over client polling for clients receiving a steady,
- * rapid stream of pushed messages.
- * This endpoint does not support polling clients and will fault any poll requests
- * that are received. To support polling clients use HTTPEndpoint instead.
- */
-public class StreamingHTTPEndpoint extends BaseStreamingHTTPEndpoint
-{
-    //--------------------------------------------------------------------------
-    //
-    // Public Constants
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * The log category for this endpoint.
-     */
-    public static final String LOG_CATEGORY = LogCategories.ENDPOINT_STREAMING_HTTP;
-
-    //--------------------------------------------------------------------------
-    //
-    // Constructors
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Constructs an unmanaged <code>StreamingHTTPEndpoint</code>.
-     */
-    public StreamingHTTPEndpoint()
-    {
-        this(false);
-    }
-
-    /**
-     * Constructs a <code>StreamingHTTPEndpoint</code> with the indicated management.
-     *
-     * @param enableManagement <code>true</code> if the <code>StreamingHTTPEndpoint</code>
-     * is manageable; <code>false</code> otherwise.
-     */
-    public StreamingHTTPEndpoint(boolean enableManagement)
-    {
-        super(enableManagement);
-    }
-
-    //--------------------------------------------------------------------------
-    //
-    // Protected Methods
-    //
-    //--------------------------------------------------------------------------
-
-    /**
-     * Create default filter chain or return current one if already present.
-     */
-    @Override protected AMFFilter createFilterChain()
-    {
-        AMFFilter serializationFilter = new SerializationFilter(getLogCategory());
-        AMFFilter batchFilter = new BatchProcessFilter();
-        AMFFilter sessionFilter = sessionRewritingEnabled? new SessionFilter() : null;
-        AMFFilter messageBrokerFilter = new MessageBrokerFilter(this);
-
-        serializationFilter.setNext(batchFilter);
-        if (sessionFilter != null)
-        {
-            batchFilter.setNext(sessionFilter);
-            sessionFilter.setNext(messageBrokerFilter);
-        }
-        else
-        {
-            batchFilter.setNext(messageBrokerFilter);
-        }
-
-        return serializationFilter;
-    }
-
-    /**
-     * Returns MessageIOConstants.XML_CONTENT_TYPE.
-     */
-    @Override protected String getResponseContentType()
-    {
-        return MessageIOConstants.XML_CONTENT_TYPE;
-    }
-
-    /**
-     * Returns the log category of the endpoint.
-     *
-     * @return The log category of the endpoint.
-     */
-    @Override protected String getLogCategory()
-    {
-        return LOG_CATEGORY;
-    }
-
-    /**
-     * Used internally for performance information gathering; not intended for
-     * public use. Serializes the message in AMFX format and returns the size
-     * of the serialized message.
-     *
-     * @param message Message to get the size for.
-     *
-     * @return The size of the message after message is serialized.
-     */
-    @Override protected long getMessageSizeForPerformanceInfo(Message message)
-    {
-        AmfxOutput amfxOut = new AmfxOutput(serializationContext);
-        ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-        DataOutputStream dataOutStream = new DataOutputStream(outStream);
-        amfxOut.setOutputStream(dataOutStream);
-        try
-        {
-            amfxOut.writeObject(message);
-        }
-        catch (IOException e)
-        {
-            if (Log.isDebug())
-                log.debug("MPI exception while retrieving the size of the serialized message: " + e.toString());
-        }
-        return dataOutStream.size();
-    }
-
-    /**
-     * Returns the deserializer class name used by the endpoint.
-     *
-     * @return The deserializer class name used by the endpoint.
-     */
-    @Override protected String getDeserializerClassName()
-    {
-        return "flex.messaging.io.amfx.AmfxMessageDeserializer";
-    }
-
-    /**
-     * Returns the serializer class name used by the endpoint.
-     *
-     * @return The serializer class name used by the endpoint.
-     */
-    @Override protected String getSerializerClassName()
-    {
-        return "flex.messaging.io.amfx.AmfxMessageSerializer";
-    }
-
-    /**
-     * Invoked automatically to allow the <code>StreamingHTTPEndpoint</code> to setup its
-     * corresponding MBean control.
-     *
-     * @param broker The <code>MessageBroker</code> that manages this
-     * <code>StreamingHTTPEndpoint</code>.
-     */
-    @Override protected void setupEndpointControl(MessageBroker broker)
-    {
-        controller = new StreamingHTTPEndpointControl(this, broker.getControl());
-        controller.register();
-        setControl(controller);
-    }
-
-    /**
-     * Helper method invoked by the endpoint request handler thread cycling in wait-notify.
-     * Serializes messages and streams each to the client as a response chunk using streamChunk().
-     *
-     * @param messages The messages to serialize and push to the client.
-     * @param os The output stream the chunk will be written to.
-     * @param response The HttpServletResponse, used to flush the chunk to the client.
-     */
-    @Override protected void streamMessages(List messages, ServletOutputStream os, HttpServletResponse response) throws IOException
-    {
-        if (messages == null || messages.isEmpty())
-            return;
-
-        // Serialize each message as a separate chunk of bytes.
-        TypeMarshallingContext.setTypeMarshaller(getTypeMarshaller());
-        for (Iterator iter = messages.iterator(); iter.hasNext();)
-        {
-            Message message = (Message)iter.next();
-            addPerformanceInfo(message);
-
-            message = convertPushMessageToSmall(message);
-
-            if (Log.isDebug())
-                log.debug("Endpoint with id '" + getId() + "' is streaming message: " + message);
-
-            AmfxOutput amfxOut = new AmfxOutput(serializationContext);
-            ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-            DataOutputStream dataOutStream = new DataOutputStream(outStream);
-            amfxOut.setOutputStream(dataOutStream);
-
-            amfxOut.writeObject(message);
-            dataOutStream.flush();
-            byte[] messageBytes = outStream.toByteArray();
-            streamChunk(messageBytes, os, response);
-
-            if (isManaged())
-                ((StreamingHTTPEndpointControl)controller).incrementPushCount();
-        }
-        TypeMarshallingContext.setTypeMarshaller(null);
-    }
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.endpoints;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletResponse;
+
+import flex.management.runtime.messaging.endpoints.StreamingHTTPEndpointControl;
+import flex.messaging.MessageBroker;
+import flex.messaging.endpoints.amf.AMFFilter;
+import flex.messaging.endpoints.amf.BatchProcessFilter;
+import flex.messaging.endpoints.amf.MessageBrokerFilter;
+import flex.messaging.endpoints.amf.SerializationFilter;
+import flex.messaging.endpoints.amf.SessionFilter;
+import flex.messaging.io.MessageIOConstants;
+import flex.messaging.io.TypeMarshallingContext;
+import flex.messaging.io.amfx.AmfxOutput;
+import flex.messaging.log.Log;
+import flex.messaging.log.LogCategories;
+import flex.messaging.messages.Message;
+
+/**
+ * Extension to the HTTPEndpoint to support streaming HTTP connections to connected
+ * clients.
+ * Each streaming connection managed by this endpoint consumes one of the request
+ * handler threads provided by the servlet container, so it is not highly scalable
+ * but offers performance advantages over client polling for clients receiving a steady,
+ * rapid stream of pushed messages.
+ * This endpoint does not support polling clients and will fault any poll requests
+ * that are received. To support polling clients use HTTPEndpoint instead.
+ */
+public class StreamingHTTPEndpoint extends BaseStreamingHTTPEndpoint
+{
+    //--------------------------------------------------------------------------
+    //
+    // Public Constants
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * The log category for this endpoint.
+     */
+    public static final String LOG_CATEGORY = LogCategories.ENDPOINT_STREAMING_HTTP;
+
+    //--------------------------------------------------------------------------
+    //
+    // Constructors
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Constructs an unmanaged <code>StreamingHTTPEndpoint</code>.
+     */
+    public StreamingHTTPEndpoint()
+    {
+        this(false);
+    }
+
+    /**
+     * Constructs a <code>StreamingHTTPEndpoint</code> with the indicated management.
+     *
+     * @param enableManagement <code>true</code> if the <code>StreamingHTTPEndpoint</code>
+     * is manageable; <code>false</code> otherwise.
+     */
+    public StreamingHTTPEndpoint(boolean enableManagement)
+    {
+        super(enableManagement);
+    }
+
+    //--------------------------------------------------------------------------
+    //
+    // Protected Methods
+    //
+    //--------------------------------------------------------------------------
+
+    /**
+     * Create default filter chain or return current one if already present.
+     */
+    @Override protected AMFFilter createFilterChain()
+    {
+        AMFFilter serializationFilter = new SerializationFilter(getLogCategory());
+        AMFFilter batchFilter = new BatchProcessFilter();
+        AMFFilter sessionFilter = sessionRewritingEnabled? new SessionFilter() : null;
+        AMFFilter messageBrokerFilter = new MessageBrokerFilter(this);
+
+        serializationFilter.setNext(batchFilter);
+        if (sessionFilter != null)
+        {
+            batchFilter.setNext(sessionFilter);
+            sessionFilter.setNext(messageBrokerFilter);
+        }
+        else
+        {
+            batchFilter.setNext(messageBrokerFilter);
+        }
+
+        return serializationFilter;
+    }
+
+    /**
+     * Returns MessageIOConstants.XML_CONTENT_TYPE.
+     */
+    @Override protected String getResponseContentType()
+    {
+        return MessageIOConstants.XML_CONTENT_TYPE;
+    }
+
+    /**
+     * Returns the log category of the endpoint.
+     *
+     * @return The log category of the endpoint.
+     */
+    @Override protected String getLogCategory()
+    {
+        return LOG_CATEGORY;
+    }
+
+    /**
+     * Used internally for performance information gathering; not intended for
+     * public use. Serializes the message in AMFX format and returns the size
+     * of the serialized message.
+     *
+     * @param message Message to get the size for.
+     *
+     * @return The size of the message after message is serialized.
+     */
+    @Override protected long getMessageSizeForPerformanceInfo(Message message)
+    {
+        AmfxOutput amfxOut = new AmfxOutput(serializationContext);
+        ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+        DataOutputStream dataOutStream = new DataOutputStream(outStream);
+        amfxOut.setOutputStream(dataOutStream);
+        try
+        {
+            amfxOut.writeObject(message);
+        }
+        catch (IOException e)
+        {
+            if (Log.isDebug())
+                log.debug("MPI exception while retrieving the size of the serialized message: " + e.toString());
+        }
+        return dataOutStream.size();
+    }
+
+    /**
+     * Returns the deserializer class name used by the endpoint.
+     *
+     * @return The deserializer class name used by the endpoint.
+     */
+    @Override protected String getDeserializerClassName()
+    {
+        return "flex.messaging.io.amfx.AmfxMessageDeserializer";
+    }
+
+    /**
+     * Returns the serializer class name used by the endpoint.
+     *
+     * @return The serializer class name used by the endpoint.
+     */
+    @Override protected String getSerializerClassName()
+    {
+        return "flex.messaging.io.amfx.AmfxMessageSerializer";
+    }
+
+    /**
+     * Invoked automatically to allow the <code>StreamingHTTPEndpoint</code> to setup its
+     * corresponding MBean control.
+     *
+     * @param broker The <code>MessageBroker</code> that manages this
+     * <code>StreamingHTTPEndpoint</code>.
+     */
+    @Override protected void setupEndpointControl(MessageBroker broker)
+    {
+        controller = new StreamingHTTPEndpointControl(this, broker.getControl());
+        controller.register();
+        setControl(controller);
+    }
+
+    /**
+     * Helper method invoked by the endpoint request handler thread cycling in wait-notify.
+     * Serializes messages and streams each to the client as a response chunk using streamChunk().
+     *
+     * @param messages The messages to serialize and push to the client.
+     * @param os The output stream the chunk will be written to.
+     * @param response The HttpServletResponse, used to flush the chunk to the client.
+     */
+    @Override protected void streamMessages(List messages, ServletOutputStream os, HttpServletResponse response) throws IOException
+    {
+        if (messages == null || messages.isEmpty())
+            return;
+
+        // Serialize each message as a separate chunk of bytes.
+        TypeMarshallingContext.setTypeMarshaller(getTypeMarshaller());
+        for (Iterator iter = messages.iterator(); iter.hasNext();)
+        {
+            Message message = (Message)iter.next();
+            addPerformanceInfo(message);
+
+            message = convertPushMessageToSmall(message);
+
+            if (Log.isDebug())
+                log.debug("Endpoint with id '" + getId() + "' is streaming message: " + message);
+
+            AmfxOutput amfxOut = new AmfxOutput(serializationContext);
+            ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+            DataOutputStream dataOutStream = new DataOutputStream(outStream);
+            amfxOut.setOutputStream(dataOutStream);
+
+            amfxOut.writeObject(message);
+            dataOutStream.flush();
+            byte[] messageBytes = outStream.toByteArray();
+            streamChunk(messageBytes, os, response);
+
+            if (isManaged())
+                ((StreamingHTTPEndpointControl)controller).incrementPushCount();
+        }
+        TypeMarshallingContext.setTypeMarshaller(null);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/4f6a3052/modules/core/src/flex/messaging/endpoints/amf/AMFFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/endpoints/amf/AMFFilter.java b/modules/core/src/flex/messaging/endpoints/amf/AMFFilter.java
old mode 100755
new mode 100644
index b0f035e..9871adb
--- a/modules/core/src/flex/messaging/endpoints/amf/AMFFilter.java
+++ b/modules/core/src/flex/messaging/endpoints/amf/AMFFilter.java
@@ -1,53 +1,53 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.endpoints.amf;
-
-import flex.messaging.io.amf.ActionContext;
-
-import java.io.IOException;
-
-/**
- * Filters perform pre- and post-processing duties on the ActionContext,
- * which contains the message/invocation as well as conextual information
- * about it, following the standard pipe-and-filter design pattern.
- *
- * @author PS Neville
- */
-public abstract class AMFFilter
-{
-    protected AMFFilter next;
-
-    public AMFFilter()
-    {
-    }
-
-    public void setNext(AMFFilter next)
-    {
-        this.next = next;
-    }
-
-    public AMFFilter getNext()
-    {
-        return next;
-    }
-
-    /**
-     * The core business method.
-     */
-    public abstract void invoke(final ActionContext context) throws IOException;
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.endpoints.amf;
+
+import flex.messaging.io.amf.ActionContext;
+
+import java.io.IOException;
+
+/**
+ * Filters perform pre- and post-processing duties on the ActionContext,
+ * which contains the message/invocation as well as conextual information
+ * about it, following the standard pipe-and-filter design pattern.
+ *
+ * @author PS Neville
+ */
+public abstract class AMFFilter
+{
+    protected AMFFilter next;
+
+    public AMFFilter()
+    {
+    }
+
+    public void setNext(AMFFilter next)
+    {
+        this.next = next;
+    }
+
+    public AMFFilter getNext()
+    {
+        return next;
+    }
+
+    /**
+     * The core business method.
+     */
+    public abstract void invoke(final ActionContext context) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/4f6a3052/modules/core/src/flex/messaging/endpoints/amf/BatchProcessFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/endpoints/amf/BatchProcessFilter.java b/modules/core/src/flex/messaging/endpoints/amf/BatchProcessFilter.java
old mode 100755
new mode 100644
index 423a7ed..8a3b694
--- a/modules/core/src/flex/messaging/endpoints/amf/BatchProcessFilter.java
+++ b/modules/core/src/flex/messaging/endpoints/amf/BatchProcessFilter.java
@@ -1,73 +1,73 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.endpoints.amf;
-
-import flex.messaging.io.amf.ActionContext;
-import flex.messaging.io.amf.MessageBody;
-import flex.messaging.io.MessageIOConstants;
-import flex.messaging.io.RecoverableSerializationException;
-
-/**
- * Filter that breaks down the batched message buffer into individual invocations.
- *
- * @author PS Neville
- */
-public class BatchProcessFilter extends AMFFilter
-{
-    public BatchProcessFilter()
-    {
-    }
-
-    public void invoke(final ActionContext context)
-    {
-        // Process each action in the body
-        int bodyCount = context.getRequestMessage().getBodyCount();
-
-        // Report batch size in Debug mode
-        //gateway.getLogger().logDebug("Processing batch of " + bodyCount + " request(s)");
-
-        for (context.setMessageNumber(0); context.getMessageNumber() < bodyCount; context.incrementMessageNumber())
-        {
-            try
-            {
-                // create the response body
-                MessageBody responseBody = new MessageBody();
-                responseBody.setTargetURI(context.getRequestMessageBody().getResponseURI());
-
-                // append the response body to the output message
-                context.getResponseMessage().addBody(responseBody);
-
-                //Check that deserialized message body data type was valid. If not, skip this message.
-                Object o = context.getRequestMessageBody().getData();
-
-                if (o != null && o instanceof RecoverableSerializationException)
-                {
-                    context.getResponseMessageBody().setData(((RecoverableSerializationException)o).createErrorMessage());
-                    context.getResponseMessageBody().setReplyMethod(MessageIOConstants.STATUS_METHOD);
-                    continue;
-                }
-
-                // invoke next filter in the chain
-                next.invoke(context);
-            }
-            catch (Exception e)
-            {
-                // continue invoking on next message body despite error
-            }
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.endpoints.amf;
+
+import flex.messaging.io.amf.ActionContext;
+import flex.messaging.io.amf.MessageBody;
+import flex.messaging.io.MessageIOConstants;
+import flex.messaging.io.RecoverableSerializationException;
+
+/**
+ * Filter that breaks down the batched message buffer into individual invocations.
+ *
+ * @author PS Neville
+ */
+public class BatchProcessFilter extends AMFFilter
+{
+    public BatchProcessFilter()
+    {
+    }
+
+    public void invoke(final ActionContext context)
+    {
+        // Process each action in the body
+        int bodyCount = context.getRequestMessage().getBodyCount();
+
+        // Report batch size in Debug mode
+        //gateway.getLogger().logDebug("Processing batch of " + bodyCount + " request(s)");
+
+        for (context.setMessageNumber(0); context.getMessageNumber() < bodyCount; context.incrementMessageNumber())
+        {
+            try
+            {
+                // create the response body
+                MessageBody responseBody = new MessageBody();
+                responseBody.setTargetURI(context.getRequestMessageBody().getResponseURI());
+
+                // append the response body to the output message
+                context.getResponseMessage().addBody(responseBody);
+
+                //Check that deserialized message body data type was valid. If not, skip this message.
+                Object o = context.getRequestMessageBody().getData();
+
+                if (o != null && o instanceof RecoverableSerializationException)
+                {
+                    context.getResponseMessageBody().setData(((RecoverableSerializationException)o).createErrorMessage());
+                    context.getResponseMessageBody().setReplyMethod(MessageIOConstants.STATUS_METHOD);
+                    continue;
+                }
+
+                // invoke next filter in the chain
+                next.invoke(context);
+            }
+            catch (Exception e)
+            {
+                // continue invoking on next message body despite error
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/4f6a3052/modules/core/src/flex/messaging/endpoints/amf/LegacyFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/endpoints/amf/LegacyFilter.java b/modules/core/src/flex/messaging/endpoints/amf/LegacyFilter.java
old mode 100755
new mode 100644
index f7c1347..626f369
--- a/modules/core/src/flex/messaging/endpoints/amf/LegacyFilter.java
+++ b/modules/core/src/flex/messaging/endpoints/amf/LegacyFilter.java
@@ -1,339 +1,339 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.endpoints.amf;
-
-import flex.messaging.endpoints.BaseHTTPEndpoint;
-import flex.messaging.io.amf.ASObject;
-import flex.messaging.io.amf.ActionContext;
-import flex.messaging.io.amf.MessageBody;
-import flex.messaging.io.amf.MessageHeader;
-import flex.messaging.messages.Message;
-import flex.messaging.messages.RemotingMessage;
-import flex.messaging.messages.ErrorMessage;
-
-import java.io.IOException;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.lang.reflect.Array;
-
-/**
- * AMF Headers are of limited use because the apply to the entire AMF packet, which
- * may contain a batch of several requests.
- * <p>
- * Rather than relying on the Flash Player team to change the AMF specification,
- * Flex 1.5 introduced the concept of a Message Envelope that allowed them to provide
- * message level headers that apply to a single request body.
- * </p>
- * <p>
- * Essentially they introduced one more layer of indirection with an ASObject of type &quot;Envelope&quot;
- * that had two properties:<br />
- * - <i>headers</i>, which was an array of Header structures<br />
- * - <i>body</i>, which was the actual data of the request (typically an array of arguments)
- * </p>
- * <p>
- * To save space on the wire, a Header structure was simply an array. The first element was
- * the header name as a String, and was the only required field. The second element, a boolean,
- * indicated whether the header must be understood. The third element, any Object, represented
- * the header value, if required.
- * </p>
- *
- * @author Peter Farland
- */
-public class LegacyFilter extends AMFFilter
-{
-    public static final String LEGACY_ENVELOPE_FLAG_KEY = "_flag";
-    public static final String LEGACY_ENVELOPE_FLAG_VALUE = "Envelope";
-    public static final String LEGACY_SECURITY_HEADER_NAME = "Credentials";
-    public static final String LEGACY_SECURITY_PRINCIPAL = "userid";
-    public static final String LEGACY_SECURITY_CREDENTIALS = "password";
-    
-    private BaseHTTPEndpoint endpoint;
-
-    public LegacyFilter(BaseHTTPEndpoint endpoint)
-    {
-        this.endpoint = endpoint;
-    }
-
-    public void invoke(final ActionContext context) throws IOException
-    {
-        MessageBody requestBody = context.getRequestMessageBody();
-        context.setLegacy(true);
-
-        // Parameters are usually sent as an AMF Array
-        Object data = requestBody.getData();
-        List newParams = null;
-
-        // Check whether we're a new Flex 2.0 Messaging request
-        if (data != null)
-        {
-            if (data.getClass().isArray())
-            {
-                int paramLength = Array.getLength(data);
-                if (paramLength == 1)
-                {
-                    Object obj = Array.get(data, 0);
-                    if (obj != null && obj instanceof Message)
-                    {
-                        context.setLegacy(false);
-                        newParams = new ArrayList();
-                        newParams.add(obj);
-                    }
-                }
-
-                // It was not a Flex 2.0 Message, but we have an array, use its contents as our params
-                if (newParams == null)
-                {
-                    newParams = new ArrayList();
-                    for (int i = 0; i < paramLength; i++)
-                    {
-                        try
-                        {
-                            newParams.add(Array.get(data, i));
-                        }
-                        catch (Throwable t)
-                        {
-                        }
-                    }
-                }
-            }
-            else if (data instanceof List)
-            {
-                List paramList = (List)data;
-                if (paramList.size() == 1)
-                {
-                    Object obj = paramList.get(0);
-                    if (obj != null && obj instanceof Message)
-                    {
-                        context.setLegacy(false);
-                        newParams = new ArrayList();
-                        newParams.add(obj);
-                    }
-                }
-
-                // It was not a Flex 2.0 Message, but we have a list, so use it as our params
-                if (newParams == null)
-                {
-                    newParams = (List)data;
-                }
-            }
-        }
-
-        // We still haven't found any lists of params, so create one with
-        // whatever data we have.
-        if (newParams == null)
-        {
-            newParams = new ArrayList();
-            newParams.add(data);
-
-        }
-
-        if (context.isLegacy())
-        {
-            newParams = legacyRequest(context, newParams);
-        }
-
-        requestBody.setData(newParams);
-
-
-        next.invoke(context);
-
-
-        if (context.isLegacy())
-        {
-            MessageBody responseBody = context.getResponseMessageBody();
-            Object response = responseBody.getData();
-
-            if (response instanceof ErrorMessage)
-            {
-                ErrorMessage error = (ErrorMessage)response;
-                ASObject aso = new ASObject();
-                aso.put("message", error.faultString);
-                aso.put("code", error.faultCode);
-                aso.put("details", error.faultDetail);
-                aso.put("rootCause", error.rootCause);
-                response = aso;
-            }
-            else if (response instanceof Message)
-            {
-                response = ((Message)response).getBody();
-            }
-            responseBody.setData(response);
-        }
-    }
-
-    private List legacyRequest(ActionContext context, List oldParams)
-    {
-        List newParams = new ArrayList(1);
-        Map headerMap = new HashMap();
-        Object body = oldParams;
-        Message message = null;
-        MessageBody requestBody = context.getRequestMessageBody();
-
-        // Legacy Packet Security
-        List packetHeaders = context.getRequestMessage().getHeaders();
-        packetCredentials(packetHeaders, headerMap);
-        
-
-        // Legacy Body
-        if (oldParams.size() == 1)
-        {
-            Object obj = oldParams.get(0);
-
-            if (obj != null && obj instanceof ASObject)
-            {
-                ASObject aso = (ASObject)obj;
-
-                // Unwrap legacy Flex 1.5 Envelope type
-                if (isEnvelope(aso))
-                {
-                    body = aso.get("data");
-
-                    // Envelope level headers
-                    Object h = aso.get("headers");
-                    if (h != null && h instanceof List)
-                    {
-                        readEnvelopeHeaders((List)h, headerMap);
-                        envelopeCredentials(headerMap);
-                    }
-                }
-            }
-        }
-
-        // Convert legacy body into a RemotingMessage
-        message = createMessage(requestBody, body, headerMap);
-        newParams.add(message);
-        return newParams;
-    }
-
-    private boolean isEnvelope(ASObject aso)
-    {
-        String flag = null;
-        Object f = aso.get(LEGACY_ENVELOPE_FLAG_KEY);
-        if (f != null && f instanceof String)
-            flag = (String)f;
-
-        if (flag != null && flag.equalsIgnoreCase(LEGACY_ENVELOPE_FLAG_VALUE))
-        {
-            return true;
-        }
-
-        return false;
-    }
-
-
-    private RemotingMessage createMessage(MessageBody messageBody, Object body, Map headerMap)
-    {
-        RemotingMessage remotingMessage = new RemotingMessage();
-        // Assigning an empty String, MessageBroker expects non-null messageId.        
-        remotingMessage.setMessageId("");  
-        remotingMessage.setBody(body);
-        remotingMessage.setHeaders(headerMap);
-
-        // Decode legacy target URI into destination.operation
-        String targetURI = messageBody.getTargetURI();
-
-        int dotIndex = targetURI.lastIndexOf(".");
-        if (dotIndex > 0)
-        {
-            String destination = targetURI.substring(0, dotIndex);
-            remotingMessage.setDestination(destination);
-        }
-
-        if (targetURI.length() > dotIndex)
-        {
-            String operation = targetURI.substring(dotIndex + 1);
-            remotingMessage.setOperation(operation);
-        }
-
-        return remotingMessage;
-    }
-
-
-    private Map readEnvelopeHeaders(List headers, Map headerMap)
-    {
-        int count = headers.size();
-
-        for (int i = 0; i < count; i++)
-        {
-            Object obj = headers.get(i);
-
-            //We currently expect a plain old AS Array
-            if (obj != null && obj instanceof List)
-            {
-                List h = (List)obj;
-
-                Object name = null;
-                //Object mustUnderstand = null;
-                Object data = null;
-
-                int numFields = h.size();
-
-                //The array must have exactly three (3) fields
-                if (numFields == 3)
-                {
-                    name = h.get(0);
-
-                    if (name != null && name instanceof String)
-                    {
-                        //mustUnderstand = h.get(1);
-                        data = h.get(2);
-                        headerMap.put(name, data);
-                    }
-                }
-            }
-        }
-
-        return headerMap;
-    }
-
-    private void envelopeCredentials(Map headers)
-    {
-        // Process Legacy Security Credentials
-        Object obj = headers.get(LEGACY_SECURITY_HEADER_NAME);
-        if (obj != null && obj instanceof ASObject)
-        {
-            ASObject header = (ASObject)obj;
-            String principal = (String)header.get(LEGACY_SECURITY_PRINCIPAL);
-            Object credentials = header.get(LEGACY_SECURITY_CREDENTIALS);
-            endpoint.getMessageBroker().getLoginManager().login(principal, credentials.toString());
-        }
-        headers.remove(LEGACY_SECURITY_HEADER_NAME);
-    }
-
-    private void packetCredentials(List packetHeaders, Map headers)
-    {
-        if (packetHeaders.size() > 0)
-        {
-            for (Iterator iter = packetHeaders.iterator(); iter.hasNext();)
-            {
-                MessageHeader header = (MessageHeader)iter.next();
-                if (header.getName().equals(LEGACY_SECURITY_HEADER_NAME))
-                {
-                    Map loginInfo = (Map)header.getData();
-                    String principal = loginInfo.get(LEGACY_SECURITY_PRINCIPAL).toString();
-                    Object credentials = loginInfo.get(LEGACY_SECURITY_CREDENTIALS);
-                    endpoint.getMessageBroker().getLoginManager().login(principal, credentials.toString());
-                    break;
-                }
-            }
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.endpoints.amf;
+
+import flex.messaging.endpoints.BaseHTTPEndpoint;
+import flex.messaging.io.amf.ASObject;
+import flex.messaging.io.amf.ActionContext;
+import flex.messaging.io.amf.MessageBody;
+import flex.messaging.io.amf.MessageHeader;
+import flex.messaging.messages.Message;
+import flex.messaging.messages.RemotingMessage;
+import flex.messaging.messages.ErrorMessage;
+
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.lang.reflect.Array;
+
+/**
+ * AMF Headers are of limited use because the apply to the entire AMF packet, which
+ * may contain a batch of several requests.
+ * <p>
+ * Rather than relying on the Flash Player team to change the AMF specification,
+ * Flex 1.5 introduced the concept of a Message Envelope that allowed them to provide
+ * message level headers that apply to a single request body.
+ * </p>
+ * <p>
+ * Essentially they introduced one more layer of indirection with an ASObject of type &quot;Envelope&quot;
+ * that had two properties:<br />
+ * - <i>headers</i>, which was an array of Header structures<br />
+ * - <i>body</i>, which was the actual data of the request (typically an array of arguments)
+ * </p>
+ * <p>
+ * To save space on the wire, a Header structure was simply an array. The first element was
+ * the header name as a String, and was the only required field. The second element, a boolean,
+ * indicated whether the header must be understood. The third element, any Object, represented
+ * the header value, if required.
+ * </p>
+ *
+ * @author Peter Farland
+ */
+public class LegacyFilter extends AMFFilter
+{
+    public static final String LEGACY_ENVELOPE_FLAG_KEY = "_flag";
+    public static final String LEGACY_ENVELOPE_FLAG_VALUE = "Envelope";
+    public static final String LEGACY_SECURITY_HEADER_NAME = "Credentials";
+    public static final String LEGACY_SECURITY_PRINCIPAL = "userid";
+    public static final String LEGACY_SECURITY_CREDENTIALS = "password";
+    
+    private BaseHTTPEndpoint endpoint;
+
+    public LegacyFilter(BaseHTTPEndpoint endpoint)
+    {
+        this.endpoint = endpoint;
+    }
+
+    public void invoke(final ActionContext context) throws IOException
+    {
+        MessageBody requestBody = context.getRequestMessageBody();
+        context.setLegacy(true);
+
+        // Parameters are usually sent as an AMF Array
+        Object data = requestBody.getData();
+        List newParams = null;
+
+        // Check whether we're a new Flex 2.0 Messaging request
+        if (data != null)
+        {
+            if (data.getClass().isArray())
+            {
+                int paramLength = Array.getLength(data);
+                if (paramLength == 1)
+                {
+                    Object obj = Array.get(data, 0);
+                    if (obj != null && obj instanceof Message)
+                    {
+                        context.setLegacy(false);
+                        newParams = new ArrayList();
+                        newParams.add(obj);
+                    }
+                }
+
+                // It was not a Flex 2.0 Message, but we have an array, use its contents as our params
+                if (newParams == null)
+                {
+                    newParams = new ArrayList();
+                    for (int i = 0; i < paramLength; i++)
+                    {
+                        try
+                        {
+                            newParams.add(Array.get(data, i));
+                        }
+                        catch (Throwable t)
+                        {
+                        }
+                    }
+                }
+            }
+            else if (data instanceof List)
+            {
+                List paramList = (List)data;
+                if (paramList.size() == 1)
+                {
+                    Object obj = paramList.get(0);
+                    if (obj != null && obj instanceof Message)
+                    {
+                        context.setLegacy(false);
+                        newParams = new ArrayList();
+                        newParams.add(obj);
+                    }
+                }
+
+                // It was not a Flex 2.0 Message, but we have a list, so use it as our params
+                if (newParams == null)
+                {
+                    newParams = (List)data;
+                }
+            }
+        }
+
+        // We still haven't found any lists of params, so create one with
+        // whatever data we have.
+        if (newParams == null)
+        {
+            newParams = new ArrayList();
+            newParams.add(data);
+
+        }
+
+        if (context.isLegacy())
+        {
+            newParams = legacyRequest(context, newParams);
+        }
+
+        requestBody.setData(newParams);
+
+
+        next.invoke(context);
+
+
+        if (context.isLegacy())
+        {
+            MessageBody responseBody = context.getResponseMessageBody();
+            Object response = responseBody.getData();
+
+            if (response instanceof ErrorMessage)
+            {
+                ErrorMessage error = (ErrorMessage)response;
+                ASObject aso = new ASObject();
+                aso.put("message", error.faultString);
+                aso.put("code", error.faultCode);
+                aso.put("details", error.faultDetail);
+                aso.put("rootCause", error.rootCause);
+                response = aso;
+            }
+            else if (response instanceof Message)
+            {
+                response = ((Message)response).getBody();
+            }
+            responseBody.setData(response);
+        }
+    }
+
+    private List legacyRequest(ActionContext context, List oldParams)
+    {
+        List newParams = new ArrayList(1);
+        Map headerMap = new HashMap();
+        Object body = oldParams;
+        Message message = null;
+        MessageBody requestBody = context.getRequestMessageBody();
+
+        // Legacy Packet Security
+        List packetHeaders = context.getRequestMessage().getHeaders();
+        packetCredentials(packetHeaders, headerMap);
+        
+
+        // Legacy Body
+        if (oldParams.size() == 1)
+        {
+            Object obj = oldParams.get(0);
+
+            if (obj != null && obj instanceof ASObject)
+            {
+                ASObject aso = (ASObject)obj;
+
+                // Unwrap legacy Flex 1.5 Envelope type
+                if (isEnvelope(aso))
+                {
+                    body = aso.get("data");
+
+                    // Envelope level headers
+                    Object h = aso.get("headers");
+                    if (h != null && h instanceof List)
+                    {
+                        readEnvelopeHeaders((List)h, headerMap);
+                        envelopeCredentials(headerMap);
+                    }
+                }
+            }
+        }
+
+        // Convert legacy body into a RemotingMessage
+        message = createMessage(requestBody, body, headerMap);
+        newParams.add(message);
+        return newParams;
+    }
+
+    private boolean isEnvelope(ASObject aso)
+    {
+        String flag = null;
+        Object f = aso.get(LEGACY_ENVELOPE_FLAG_KEY);
+        if (f != null && f instanceof String)
+            flag = (String)f;
+
+        if (flag != null && flag.equalsIgnoreCase(LEGACY_ENVELOPE_FLAG_VALUE))
+        {
+            return true;
+        }
+
+        return false;
+    }
+
+
+    private RemotingMessage createMessage(MessageBody messageBody, Object body, Map headerMap)
+    {
+        RemotingMessage remotingMessage = new RemotingMessage();
+        // Assigning an empty String, MessageBroker expects non-null messageId.        
+        remotingMessage.setMessageId("");  
+        remotingMessage.setBody(body);
+        remotingMessage.setHeaders(headerMap);
+
+        // Decode legacy target URI into destination.operation
+        String targetURI = messageBody.getTargetURI();
+
+        int dotIndex = targetURI.lastIndexOf(".");
+        if (dotIndex > 0)
+        {
+            String destination = targetURI.substring(0, dotIndex);
+            remotingMessage.setDestination(destination);
+        }
+
+        if (targetURI.length() > dotIndex)
+        {
+            String operation = targetURI.substring(dotIndex + 1);
+            remotingMessage.setOperation(operation);
+        }
+
+        return remotingMessage;
+    }
+
+
+    private Map readEnvelopeHeaders(List headers, Map headerMap)
+    {
+        int count = headers.size();
+
+        for (int i = 0; i < count; i++)
+        {
+            Object obj = headers.get(i);
+
+            //We currently expect a plain old AS Array
+            if (obj != null && obj instanceof List)
+            {
+                List h = (List)obj;
+
+                Object name = null;
+                //Object mustUnderstand = null;
+                Object data = null;
+
+                int numFields = h.size();
+
+                //The array must have exactly three (3) fields
+                if (numFields == 3)
+                {
+                    name = h.get(0);
+
+                    if (name != null && name instanceof String)
+                    {
+                        //mustUnderstand = h.get(1);
+                        data = h.get(2);
+                        headerMap.put(name, data);
+                    }
+                }
+            }
+        }
+
+        return headerMap;
+    }
+
+    private void envelopeCredentials(Map headers)
+    {
+        // Process Legacy Security Credentials
+        Object obj = headers.get(LEGACY_SECURITY_HEADER_NAME);
+        if (obj != null && obj instanceof ASObject)
+        {
+            ASObject header = (ASObject)obj;
+            String principal = (String)header.get(LEGACY_SECURITY_PRINCIPAL);
+            Object credentials = header.get(LEGACY_SECURITY_CREDENTIALS);
+            endpoint.getMessageBroker().getLoginManager().login(principal, credentials.toString());
+        }
+        headers.remove(LEGACY_SECURITY_HEADER_NAME);
+    }
+
+    private void packetCredentials(List packetHeaders, Map headers)
+    {
+        if (packetHeaders.size() > 0)
+        {
+            for (Iterator iter = packetHeaders.iterator(); iter.hasNext();)
+            {
+                MessageHeader header = (MessageHeader)iter.next();
+                if (header.getName().equals(LEGACY_SECURITY_HEADER_NAME))
+                {
+                    Map loginInfo = (Map)header.getData();
+                    String principal = loginInfo.get(LEGACY_SECURITY_PRINCIPAL).toString();
+                    Object credentials = loginInfo.get(LEGACY_SECURITY_CREDENTIALS);
+                    endpoint.getMessageBroker().getLoginManager().login(principal, credentials.toString());
+                    break;
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flex-blazeds/blob/4f6a3052/modules/core/src/flex/messaging/endpoints/amf/MessageBrokerFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/flex/messaging/endpoints/amf/MessageBrokerFilter.java b/modules/core/src/flex/messaging/endpoints/amf/MessageBrokerFilter.java
old mode 100755
new mode 100644
index 5ceefc9..d8288f3
--- a/modules/core/src/flex/messaging/endpoints/amf/MessageBrokerFilter.java
+++ b/modules/core/src/flex/messaging/endpoints/amf/MessageBrokerFilter.java
@@ -1,180 +1,180 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package flex.messaging.endpoints.amf;
-
-import flex.messaging.FlexContext;
-import flex.messaging.FlexSession;
-import flex.messaging.MessageException;
-import flex.messaging.endpoints.AbstractEndpoint;
-import flex.messaging.io.MessageIOConstants;
-import flex.messaging.io.amf.ActionContext;
-import flex.messaging.io.amf.MessageBody;
-import flex.messaging.messages.CommandMessage;
-import flex.messaging.messages.ErrorMessage;
-import flex.messaging.messages.Message;
-import flex.messaging.messages.MessagePerformanceUtils;
-import flex.messaging.services.MessageService;
-import flex.messaging.util.StringUtils;
-import flex.messaging.log.LogCategories;
-import flex.messaging.log.Log;
-import flex.messaging.util.ExceptionUtil;
-import flex.messaging.util.UUIDUtils;
-
-import java.util.List;
-import java.lang.reflect.Array;
-
-/**
- * A simple bridge between the encoding/decoding functionality of
- * the AMF endpoint and the MessageBroker: this last filter in the
- * chain returns the message to the MessageBroker, which will then
- * locate the correct service to handle the message.
- */
-public class MessageBrokerFilter extends AMFFilter
-{
-    private static final int UNHANDLED_ERROR = 10000;
-    static final String LOG_CATEGORY = LogCategories.MESSAGE_GENERAL;
-
-    protected AbstractEndpoint endpoint;
-
-    public MessageBrokerFilter(AbstractEndpoint endpoint)
-    {
-        this.endpoint = endpoint;
-    }
-
-    public void invoke(final ActionContext context)
-    {
-        MessageBody request = context.getRequestMessageBody();
-        MessageBody response = context.getResponseMessageBody();
-
-        Message inMessage = request.getDataAsMessage();
-
-        Object outMessage = null;
-
-        String replyMethodName = MessageIOConstants.STATUS_METHOD;
-
-        try
-        {
-            // Lookup or create the correct FlexClient.
-            endpoint.setupFlexClient(inMessage);
-
-            // Assign a clientId if necessary.
-            // We don't need to assign clientIds to general poll requests.
-            if (inMessage.getClientId() == null &&
-                (!(inMessage instanceof CommandMessage) || ((CommandMessage)inMessage).getOperation() != CommandMessage.POLL_OPERATION))
-            {
-                Object clientId = UUIDUtils.createUUID();
-                inMessage.setClientId(clientId);
-            }
-
-            // Messages received via the AMF channel can be batched (by NetConnection on the client) and
-            // we must not put the handler thread into a poll-wait state if a poll command message is followed by
-            // or preceeded by other messages in the batch; the request-response loop must complete without waiting.
-            // If the poll command is the only message in the batch it's ok to wait.
-            // If it isn't ok to wait, tag the poll message with a header that short-circuits any potential poll-wait.
-            if (inMessage instanceof CommandMessage)
-            {
-                CommandMessage command = (CommandMessage)inMessage;
-                if ((command.getOperation() == CommandMessage.POLL_OPERATION) && (context.getRequestMessage().getBodyCount() != 1))
-                    command.setHeader(CommandMessage.SUPPRESS_POLL_WAIT_HEADER, Boolean.TRUE);
-            }
-
-            // If MPI is enabled update the MPI metrics on the object referred to by the context
-            // and the messages
-            if (context.isMPIenabled())
-                MessagePerformanceUtils.setupMPII(context, inMessage);
-
-            // Service the message.
-            outMessage = endpoint.serviceMessage(inMessage);
-
-            // if processing of the message resulted in an error, set up context and reply method accordingly
-            if (outMessage instanceof ErrorMessage)
-            {
-                context.setStatus(MessageIOConstants.STATUS_ERR);
-                replyMethodName = MessageIOConstants.STATUS_METHOD;
-            }
-            else
-            {
-                replyMethodName = MessageIOConstants.RESULT_METHOD;
-            }
-        }
-        catch (MessageException e)
-        {
-            context.setStatus(MessageIOConstants.STATUS_ERR);
-            replyMethodName = MessageIOConstants.STATUS_METHOD;
-
-            outMessage = e.createErrorMessage();
-            ((ErrorMessage)outMessage).setCorrelationId(inMessage.getMessageId());
-            ((ErrorMessage)outMessage).setDestination(inMessage.getDestination());
-            ((ErrorMessage)outMessage).setClientId(inMessage.getClientId());
-
-            e.logAtHingePoint(inMessage, (ErrorMessage)outMessage, null /* Use default message intros */);
-        }
-        catch (Throwable t)
-        {
-            // Handle any uncaught failures. The normal exception path on the server
-            // is to throw MessageExceptions which are handled in the catch block above,
-            // so if that was skipped we have an overlooked or serious problem.
-            context.setStatus(MessageIOConstants.STATUS_ERR);
-            replyMethodName = MessageIOConstants.STATUS_METHOD;
-
-            String lmeMessage = t.getMessage();
-            if (lmeMessage == null)
-                lmeMessage = t.getClass().getName();
-
-            MessageException lme = new MessageException();
-            lme.setMessage(UNHANDLED_ERROR, new Object[] {lmeMessage});
-
-            outMessage = lme.createErrorMessage();
-            ((ErrorMessage)outMessage).setCorrelationId(inMessage.getMessageId());
-            ((ErrorMessage)outMessage).setDestination(inMessage.getDestination());
-            ((ErrorMessage)outMessage).setClientId(inMessage.getClientId());
-
-            if (Log.isError())
-            {
-                Log.getLogger(LOG_CATEGORY).error("Unhandled error when processing a message: " +
-                        t.toString() + StringUtils.NEWLINE +
-                        "  incomingMessage: " + inMessage + StringUtils.NEWLINE +
-                        "  errorReply: " + outMessage + StringUtils.NEWLINE +
-                        ExceptionUtil.exceptionFollowedByRootCausesToString(t) + StringUtils.NEWLINE);
-            }
-        }
-        finally
-        {
-            // If MPI is enabled update the MPI metrics on the object referred to by the context
-            // and the messages
-            if (context.isRecordMessageSizes() || context.isRecordMessageTimes())
-            {
-                MessagePerformanceUtils.updateOutgoingMPI(context, inMessage, outMessage);
-            }
-
-            // If our channel-endpoint combination supports small messages, and
-            // if we know the current protocol version supports small messages,
-            // try to replace the message...
-            FlexSession session = FlexContext.getFlexSession();
-            if (session != null && session.useSmallMessages()
-                    && !context.isLegacy()
-                    && context.getVersion() >= MessageIOConstants.AMF3
-                    && outMessage instanceof Message)
-            {
-                outMessage = endpoint.convertToSmallMessage((Message)outMessage);
-            }
-
-            response.setReplyMethod(replyMethodName);
-            response.setData(outMessage);
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package flex.messaging.endpoints.amf;
+
+import flex.messaging.FlexContext;
+import flex.messaging.FlexSession;
+import flex.messaging.MessageException;
+import flex.messaging.endpoints.AbstractEndpoint;
+import flex.messaging.io.MessageIOConstants;
+import flex.messaging.io.amf.ActionContext;
+import flex.messaging.io.amf.MessageBody;
+import flex.messaging.messages.CommandMessage;
+import flex.messaging.messages.ErrorMessage;
+import flex.messaging.messages.Message;
+import flex.messaging.messages.MessagePerformanceUtils;
+import flex.messaging.services.MessageService;
+import flex.messaging.util.StringUtils;
+import flex.messaging.log.LogCategories;
+import flex.messaging.log.Log;
+import flex.messaging.util.ExceptionUtil;
+import flex.messaging.util.UUIDUtils;
+
+import java.util.List;
+import java.lang.reflect.Array;
+
+/**
+ * A simple bridge between the encoding/decoding functionality of
+ * the AMF endpoint and the MessageBroker: this last filter in the
+ * chain returns the message to the MessageBroker, which will then
+ * locate the correct service to handle the message.
+ */
+public class MessageBrokerFilter extends AMFFilter
+{
+    private static final int UNHANDLED_ERROR = 10000;
+    static final String LOG_CATEGORY = LogCategories.MESSAGE_GENERAL;
+
+    protected AbstractEndpoint endpoint;
+
+    public MessageBrokerFilter(AbstractEndpoint endpoint)
+    {
+        this.endpoint = endpoint;
+    }
+
+    public void invoke(final ActionContext context)
+    {
+        MessageBody request = context.getRequestMessageBody();
+        MessageBody response = context.getResponseMessageBody();
+
+        Message inMessage = request.getDataAsMessage();
+
+        Object outMessage = null;
+
+        String replyMethodName = MessageIOConstants.STATUS_METHOD;
+
+        try
+        {
+            // Lookup or create the correct FlexClient.
+            endpoint.setupFlexClient(inMessage);
+
+            // Assign a clientId if necessary.
+            // We don't need to assign clientIds to general poll requests.
+            if (inMessage.getClientId() == null &&
+                (!(inMessage instanceof CommandMessage) || ((CommandMessage)inMessage).getOperation() != CommandMessage.POLL_OPERATION))
+            {
+                Object clientId = UUIDUtils.createUUID();
+                inMessage.setClientId(clientId);
+            }
+
+            // Messages received via the AMF channel can be batched (by NetConnection on the client) and
+            // we must not put the handler thread into a poll-wait state if a poll command message is followed by
+            // or preceeded by other messages in the batch; the request-response loop must complete without waiting.
+            // If the poll command is the only message in the batch it's ok to wait.
+            // If it isn't ok to wait, tag the poll message with a header that short-circuits any potential poll-wait.
+            if (inMessage instanceof CommandMessage)
+            {
+                CommandMessage command = (CommandMessage)inMessage;
+                if ((command.getOperation() == CommandMessage.POLL_OPERATION) && (context.getRequestMessage().getBodyCount() != 1))
+                    command.setHeader(CommandMessage.SUPPRESS_POLL_WAIT_HEADER, Boolean.TRUE);
+            }
+
+            // If MPI is enabled update the MPI metrics on the object referred to by the context
+            // and the messages
+            if (context.isMPIenabled())
+                MessagePerformanceUtils.setupMPII(context, inMessage);
+
+            // Service the message.
+            outMessage = endpoint.serviceMessage(inMessage);
+
+            // if processing of the message resulted in an error, set up context and reply method accordingly
+            if (outMessage instanceof ErrorMessage)
+            {
+                context.setStatus(MessageIOConstants.STATUS_ERR);
+                replyMethodName = MessageIOConstants.STATUS_METHOD;
+            }
+            else
+            {
+                replyMethodName = MessageIOConstants.RESULT_METHOD;
+            }
+        }
+        catch (MessageException e)
+        {
+            context.setStatus(MessageIOConstants.STATUS_ERR);
+            replyMethodName = MessageIOConstants.STATUS_METHOD;
+
+            outMessage = e.createErrorMessage();
+            ((ErrorMessage)outMessage).setCorrelationId(inMessage.getMessageId());
+            ((ErrorMessage)outMessage).setDestination(inMessage.getDestination());
+            ((ErrorMessage)outMessage).setClientId(inMessage.getClientId());
+
+            e.logAtHingePoint(inMessage, (ErrorMessage)outMessage, null /* Use default message intros */);
+        }
+        catch (Throwable t)
+        {
+            // Handle any uncaught failures. The normal exception path on the server
+            // is to throw MessageExceptions which are handled in the catch block above,
+            // so if that was skipped we have an overlooked or serious problem.
+            context.setStatus(MessageIOConstants.STATUS_ERR);
+            replyMethodName = MessageIOConstants.STATUS_METHOD;
+
+            String lmeMessage = t.getMessage();
+            if (lmeMessage == null)
+                lmeMessage = t.getClass().getName();
+
+            MessageException lme = new MessageException();
+            lme.setMessage(UNHANDLED_ERROR, new Object[] {lmeMessage});
+
+            outMessage = lme.createErrorMessage();
+            ((ErrorMessage)outMessage).setCorrelationId(inMessage.getMessageId());
+            ((ErrorMessage)outMessage).setDestination(inMessage.getDestination());
+            ((ErrorMessage)outMessage).setClientId(inMessage.getClientId());
+
+            if (Log.isError())
+            {
+                Log.getLogger(LOG_CATEGORY).error("Unhandled error when processing a message: " +
+                        t.toString() + StringUtils.NEWLINE +
+                        "  incomingMessage: " + inMessage + StringUtils.NEWLINE +
+                        "  errorReply: " + outMessage + StringUtils.NEWLINE +
+                        ExceptionUtil.exceptionFollowedByRootCausesToString(t) + StringUtils.NEWLINE);
+            }
+        }
+        finally
+        {
+            // If MPI is enabled update the MPI metrics on the object referred to by the context
+            // and the messages
+            if (context.isRecordMessageSizes() || context.isRecordMessageTimes())
+            {
+                MessagePerformanceUtils.updateOutgoingMPI(context, inMessage, outMessage);
+            }
+
+            // If our channel-endpoint combination supports small messages, and
+            // if we know the current protocol version supports small messages,
+            // try to replace the message...
+            FlexSession session = FlexContext.getFlexSession();
+            if (session != null && session.useSmallMessages()
+                    && !context.isLegacy()
+                    && context.getVersion() >= MessageIOConstants.AMF3
+                    && outMessage instanceof Message)
+            {
+                outMessage = endpoint.convertToSmallMessage((Message)outMessage);
+            }
+
+            response.setReplyMethod(replyMethodName);
+            response.setData(outMessage);
+        }
+    }
+}


Mime
View raw message