tomcat-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject svn commit: r1640688 - in /tomcat/trunk: java/org/apache/tomcat/websocket/ test/org/apache/tomcat/websocket/
Date Thu, 20 Nov 2014 06:30:09 GMT
Author: markt
Date: Thu Nov 20 06:30:08 2014
New Revision: 1640688

URL: http://svn.apache.org/r1640688
Log:
Fix various problems identified with flushing batched messages:
- Flush triggered by disabling batching failed to flip buffer before writing and also failed
to clear the buffer after writing was complete. This resulted in duplicated and/or corrupted
messages.
- The flush triggered by session close was too late since no writes are permitted once the
close process starts. This resulted in an exception being thrown.

Modified:
    tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
    tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
    tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1640688&r1=1640687&r2=1640688&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Thu Nov 20 06:30:08
2014
@@ -70,7 +70,7 @@ wsRemoteEndpoint.closedOutputStream=This
 wsRemoteEndpoint.closedWriter=This method may not be called as the Writer has been closed
 wsRemoteEndpoint.changeType=When sending a fragmented message, all fragments bust be of the
same type
 wsRemoteEndpoint.concurrentMessageSend=Messages may not be sent concurrently even when using
the asynchronous send messages. The client must wait for the previous message to complete
before sending the next.
-wsRemoteEndpoint.flushOnCloseFailed=Flushing batched messages before closing the session
failed
+wsRemoteEndpoint.flushOnCloseFailed=Batched messages still enabled after session has been
closed. Unable to flush remaining batched message.
 wsRemoteEndpoint.invalidEncoder=The specified encoder of type [{0}] could not be instantiated
 wsRemoteEndpoint.noEncoder=No encoder specified for object of class [{0}]
 wsRemoteEndpoint.wrongState=The remote endpoint was in state [{0}] which is an invalid state
for called method
@@ -88,6 +88,7 @@ wsSession.duplicateHandlerBinary=A binar
 wsSession.duplicateHandlerPong=A pong message handler has already been configured
 wsSession.duplicateHandlerText=A text message handler has already been configured
 wsSession.invalidHandlerTypePong=A pong message handler must implement MessageHandler.Basic
+wsSession.flushFailOnClose=Failed to flush batched messages on session close
 wsSession.messageFailed=Unable to write the complete message as the WebSocket connection
has been closed
 wsSession.sendCloseFail=Failed to send close message to remote endpoint
 wsSession.removeHandlerFailed=Unable to remove the handler [{0}] as it was not registered
with this session

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1640688&r1=1640687&r2=1640688&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Thu Nov 20
06:30:08 2014
@@ -302,13 +302,10 @@ public abstract class WsRemoteEndpointIm
 
         boolean doWrite = false;
         synchronized (messagePartLock) {
-            if (Constants.OPCODE_CLOSE == mp.getOpCode()) {
-                try {
-                    setBatchingAllowed(false);
-                } catch (IOException e) {
-                    log.warn(sm.getString(
-                            "wsRemoteEndpoint.flushOnCloseFailed"), e);
-                }
+            if (Constants.OPCODE_CLOSE == mp.getOpCode() && getBatchingAllowed())
{
+                // Should not happen. To late to send batched messages now since
+                // the session has been closed. Complain loudly.
+                log.warn(sm.getString("wsRemoteEndpoint.flushOnCloseFailed"));
             }
             if (messagePartInProgress) {
                 // When a control message is sent while another message is being
@@ -382,7 +379,10 @@ public abstract class WsRemoteEndpointIm
         if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) {
             nextFragmented = fragmented;
             nextText = text;
-            doWrite(mp.getEndHandler(), outputBuffer);
+            outputBuffer.flip();
+            SendHandler flushHandler = new OutputBufferFlushSendHandler(
+                    outputBuffer, mp.getEndHandler());
+            doWrite(flushHandler, outputBuffer);
             return;
         }
 
@@ -836,6 +836,30 @@ public abstract class WsRemoteEndpointIm
         }
     }
 
+
+    /**
+     * Ensures that tne output buffer is cleared after it has been flushed.
+     */
+    private static class OutputBufferFlushSendHandler implements SendHandler {
+
+        private final ByteBuffer outputBuffer;
+        private final SendHandler handler;
+
+        public OutputBufferFlushSendHandler(ByteBuffer outputBuffer, SendHandler handler)
{
+            this.outputBuffer = outputBuffer;
+            this.handler = handler;
+        }
+
+        @Override
+        public void onResult(SendResult result) {
+            if (result.isOK()) {
+                outputBuffer.clear();
+            }
+            handler.onResult(result);
+        }
+    }
+
+
     private class WsOutputStream extends OutputStream {
 
         private final WsRemoteEndpointImplBase endpoint;

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1640688&r1=1640687&r2=1640688&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java Thu Nov 20 06:30:08 2014
@@ -459,6 +459,13 @@ public class WsSession implements Sessio
                 return;
             }
 
+            try {
+                wsRemoteEndpoint.setBatchingAllowed(false);
+            } catch (IOException e) {
+                log.warn(sm.getString("wsSession.flushFailOnClose"), e);
+                fireEndpointOnError(e);
+            }
+
             state = State.CLOSING;
 
             sendCloseMessage(closeReasonMessage);
@@ -487,6 +494,12 @@ public class WsSession implements Sessio
 
         synchronized (stateLock) {
             if (state == State.OPEN) {
+                try {
+                    wsRemoteEndpoint.setBatchingAllowed(false);
+                } catch (IOException e) {
+                    log.warn(sm.getString("wsSession.flushFailOnClose"), e);
+                    fireEndpointOnError(e);
+                }
                 sendCloseMessage(closeReason);
                 fireEndpointOnClose(closeReason);
                 state = State.CLOSED;
@@ -497,7 +510,6 @@ public class WsSession implements Sessio
         }
     }
 
-
     private void fireEndpointOnClose(CloseReason closeReason) {
 
         // Fire the onClose event
@@ -515,6 +527,21 @@ public class WsSession implements Sessio
     }
 
 
+
+    private void fireEndpointOnError(Throwable throwable) {
+
+        // Fire the onError event
+        Thread t = Thread.currentThread();
+        ClassLoader cl = t.getContextClassLoader();
+        t.setContextClassLoader(applicationClassLoader);
+        try {
+            localEndpoint.onError(this, throwable);
+        } finally {
+            t.setContextClassLoader(cl);
+        }
+    }
+
+
     private void sendCloseMessage(CloseReason closeReason) {
         // 125 is maximum size for the payload of a control message
         ByteBuffer msg = ByteBuffer.allocate(125);

Modified: tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java?rev=1640688&r1=1640687&r2=1640688&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java (original)
+++ tomcat/trunk/test/org/apache/tomcat/websocket/TesterFirehoseServer.java Thu Nov 20 06:30:08
2014
@@ -119,10 +119,14 @@ public class TesterFirehoseServer {
 
             for (int i = 0; i < MESSAGE_COUNT; i++) {
                 remote.sendText(MESSAGE);
+                if (i % (MESSAGE_COUNT * 0.4) == 0) {
+                    remote.setBatchingAllowed(false);
+                    remote.setBatchingAllowed(true);
+                }
             }
 
-            // Ensure remaining messages are flushed
-            remote.setBatchingAllowed(false);
+            // Flushing should happen automatically on session close
+            session.close();
         }
 
         @OnError



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Mime
View raw message