tomcat-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Pesonen, Harri" <harri.peso...@sap.com>
Subject RE: Tomcat WebSocket does not always send asynchronous messages
Date Wed, 08 Mar 2017 15:48:38 GMT
Here are my versions of these test files:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tomcat.websocket.server;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;

import javax.websocket.ClientEndpointConfig;
import javax.websocket.ContainerProvider;
import javax.websocket.MessageHandler;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;

import org.apache.catalina.Context;
import org.apache.catalina.servlets.DefaultServlet;
import org.apache.catalina.startup.Tomcat;
import org.apache.catalina.startup.TomcatBaseTest;
import org.apache.tomcat.websocket.TesterAsyncTiming;
import org.apache.tomcat.websocket.TesterMessageCountClient.TesterProgrammaticEndpoint;
import org.junit.Assert;
import org.junit.Test;

//@Ignore // Test passes but GC delays can introduce false failures.
public class TestAsyncMessages extends TomcatBaseTest {

    @Test
    public void testAsyncTiming() throws Exception {

        Tomcat tomcat = getTomcatInstance();
        // No file system docBase required
        Context ctx = tomcat.addContext("", null);
        ctx.addApplicationListener(TesterAsyncTiming.Config.class.getName());
        DefaultServlet defaultServlet = new DefaultServlet();
        WebSocketContainer wsContainer =
                ContainerProvider.getWebSocketContainer();

        Tomcat.addServlet(ctx, "default", defaultServlet);
        ctx.addServletMappingDecoded("/", "default");

        ctx.addParameter(Constants.
                BINARY_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM, "" + TesterAsyncTiming.Config.LARGE_DATA_SIZE);
        ctx.addParameter(org.apache.tomcat.websocket.server.Constants.
                TEXT_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM, "" + TesterAsyncTiming.Config.LARGE_DATA_SIZE);
        wsContainer.setDefaultMaxBinaryMessageBufferSize(TesterAsyncTiming.Config.LARGE_DATA_SIZE);
        wsContainer.setDefaultMaxTextMessageBufferSize(TesterAsyncTiming.Config.LARGE_DATA_SIZE);

        tomcat.start();

        ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().build();
        Session wsSession = wsContainer.connectToServer(
                TesterProgrammaticEndpoint.class,
                clientEndpointConfig,
                new URI("ws://localhost:" + getPort() + TesterAsyncTiming.Config.PATH));

        AsyncTimingClientHandler handler = new AsyncTimingClientHandler();
        wsSession.addMessageHandler(ByteBuffer.class, handler);
        wsSession.getBasicRemote().sendText("Hello");

        System.out.println("Sent Hello message, waiting for data");
        handler.waitForLatch();
        Assert.assertFalse(handler.hasFailed());
    }

    private static class AsyncTimingClientHandler implements MessageHandler.Partial<ByteBuffer>
{

        private long lastMessage = 0;
        private int sequence = 0;
        private int count = 0;
        private CountDownLatch latch = new CountDownLatch(1);
        private volatile boolean fail = false;
        private long minDelayLong = TesterAsyncTiming.Config.SLEEP_MILLI - 20;
        private long maxDelayLong = TesterAsyncTiming.Config.SLEEP_MILLI + 20;
        private long maxDelayShort = 20;

        @Override
        public void onMessage(ByteBuffer message, boolean last) {
            if (lastMessage == 0) {
                // First message. Don't check
                sequence ++;
                lastMessage = System.currentTimeMillis();
            } else {
                long newTime = System.currentTimeMillis();
                long diff = newTime - lastMessage;
                lastMessage = newTime;
                if (sequence == 0) {
                    sequence++;
                    if (message.capacity() != TesterAsyncTiming.Config.LARGE_DATA_SIZE) {
                        System.out.println(
                                "Expected size " + TesterAsyncTiming.Config.LARGE_DATA_SIZE
+ " but was [" + message
                                        .capacity() + "], count [" + count + "]");
                        fail = true;
                    }
                    if (diff < minDelayLong) {
                        System.out.println(
                                "Expected diff > " + minDelayLong + " ms but was [" + diff
+ "], count [" + count
                                        + "]");
                        fail = true;
                    } else if (diff > maxDelayLong) {
                        System.out.println(
                                "Expected diff < " + maxDelayLong + " ms but was [" + diff
+ "], count [" + count
                                        + "]");
                        fail = true;
                    }
                } else if (sequence == 1) {
                    sequence = 0;
                    if (message.capacity() != TesterAsyncTiming.Config.SMALL_DATA_SIZE) {
                        System.out.println(
                                "Expected size " + TesterAsyncTiming.Config.SMALL_DATA_SIZE
+ " but was [" + message
                                        .capacity() + "], count [" + count + "]");
                        fail = true;
                    }
                    if (diff > maxDelayShort) {
                        System.out.println(
                                "Expected diff < " + maxDelayShort + " ms but was [" +
diff + "], count [" + count
                                        + "]");
                        fail = true;
                    }
                }
            }
            count ++;
            if (count >= TesterAsyncTiming.Config.ITERATIONS * 2) {
                latch.countDown();
            }
        }

        public void waitForLatch() throws InterruptedException {
            latch.await();
        }

        public boolean hasFailed() {
            return fail;
        }
    }
}

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tomcat.websocket;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Semaphore;

import javax.websocket.OnMessage;
import javax.websocket.RemoteEndpoint.Async;
import javax.websocket.SendHandler;
import javax.websocket.SendResult;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

import org.apache.tomcat.websocket.server.TesterEndpointConfig;

public class TesterAsyncTiming {

    public static class Config extends TesterEndpointConfig {

        public static final String PATH = "/timing";
        public static final int ITERATIONS = 100;
        public static final int SLEEP_MILLI = 100;
        public static final int LARGE_DATA_SIZE = 16 * 1024;
        public static final int SMALL_DATA_SIZE = 1904;

        @Override
        protected Class<?> getEndpointClass() {
            return Endpoint.class;
        }
    }

    @ServerEndpoint(Config.PATH)
    public static class Endpoint {

        private static final ByteBuffer LARGE_DATA= ByteBuffer.allocate(Config.LARGE_DATA_SIZE);
        private static final ByteBuffer SMALL_DATA= ByteBuffer.allocate(Config.SMALL_DATA_SIZE);

        @OnMessage
        public void onMessage(Session session, @SuppressWarnings("unused") String text) {

            Semaphore semaphore = new Semaphore(1);
            SendHandler handler = new SemaphoreSendHandler(semaphore);

            Async remote = session.getAsyncRemote();
            int i = 0;
            while (true) {
                try {
                    semaphore.acquire(1);
                    remote.sendBinary(LARGE_DATA, handler);
                    semaphore.acquire(1);
                    remote.sendBinary(SMALL_DATA, handler);
                    if (i >= Config.ITERATIONS - 1)
                        break;
                    Thread.sleep(Config.SLEEP_MILLI);
                    LARGE_DATA.flip();
                    SMALL_DATA.flip();
                    i++;
                } catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                }
            }
        }

        private class SemaphoreSendHandler implements SendHandler {

            private final Semaphore semaphore;

            private SemaphoreSendHandler(Semaphore semaphore) {
                this.semaphore = semaphore;
            }

            @Override
            public void onResult(SendResult result) {
                semaphore.release();
            }
        }
    }
}

-Harri

-----Original Message-----
From: Pesonen, Harri [mailto:harri.pesonen@sap.com] 
Sent: 8. maaliskuuta 2017 16:32
To: Tomcat Users List <users@tomcat.apache.org>
Subject: RE: Tomcat WebSocket does not always send asynchronous messages

Hello, and sorry for top-posting, I don't know how to configure Outlook to do it differently.

I was finally able to run your test. I had a lot of trouble doing it:
* did not have SVN, downloaded TortoiseSVN
* tried to open the project in IDEA, but failed miserably, I really hope that there was pom.xml
* was able to build whole Tomcat and test using ant command line, but it took so long, had
to abort
* was not able to run this single test with ant:

Testsuite: org.apache.tomcat.websocket.server.TestAsyncMessages.java
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0 sec

	Caused an ERROR
org.apache.tomcat.websocket.server.TestAsyncMessages.java
java.lang.ClassNotFoundException: org.apache.tomcat.websocket.server.TestAsyncMessages.java

* but was able to make the Eclipse project with "ant ide-eclipse"
* was able to run the unit test in Eclipse:

08-Mar-2017 14:14:40.538 INFO [main] org.apache.catalina.startup.LoggingBaseTest.setUp Starting
test case [testAsyncTiming]
08-Mar-2017 14:14:42.676 INFO [main] org.apache.coyote.AbstractProtocol.init Initializing
ProtocolHandler ["http-nio-127.0.0.1-auto-1"]
08-Mar-2017 14:14:42.778 INFO [main] org.apache.tomcat.util.net.NioSelectorPool.getSharedSelector
Using a shared selector for servlet write/read
08-Mar-2017 14:14:42.800 INFO [main] org.apache.catalina.core.StandardService.startInternal
Starting service Tomcat
08-Mar-2017 14:14:42.802 INFO [main] org.apache.catalina.core.StandardEngine.startInternal
Starting Servlet Engine: Apache Tomcat/@VERSION@
08-Mar-2017 14:14:43.213 INFO [main] org.apache.coyote.AbstractProtocol.start Starting ProtocolHandler
[http-nio-127.0.0.1-auto-1-54783]
Sent Hello message, waiting for data
Expected diff < 500,000 but was [6054390], count [2]
Expected diff < 500,000 but was [1015710], count [14]
Expected diff < 500,000 but was [642270], count [25]
Expected diff < 500,000 but was [1712852], count [26]
Expected diff < 500,000 but was [595293], count [41]
Expected diff < 500,000 but was [792673], count [61]
Expected diff < 500,000 but was [799777], count [62]
Expected diff < 500,000 but was [531738], count [68]
Expected diff < 500,000 but was [532922], count [76]
Expected diff < 500,000 but was [673851], count [98]
Expected diff < 500,000 but was [538054], count [133]
Expected diff < 500,000 but was [747276], count [158]
Expected diff < 500,000 but was [794646], count [262]
Expected diff < 500,000 but was [1290461], count [263]
Expected diff < 500,000 but was [1013341], count [296]
Expected diff < 500,000 but was [582267], count [311]
Expected diff < 500,000 but was [1377703], count [337]
Expected diff < 500,000 but was [1698245], count [338]
Expected diff < 500,000 but was [1303488], count [424]
Expected diff < 500,000 but was [965181], count [425]
Expected diff < 500,000 but was [534896], count [455]
Expected diff < 500,000 but was [847938], count [458]
Expected diff < 500,000 but was [883862], count [473]
Expected diff < 500,000 but was [1026368], count [475]
Expected diff < 500,000 but was [1096241], count [476]
Expected diff < 500,000 but was [518710], count [481]
Expected diff < 500,000 but was [1053607], count [482]
Expected diff < 500,000 but was [641481], count [500]
Expected diff < 500,000 but was [565292], count [512]
Expected diff < 500,000 but was [808857], count [556]
Expected diff < 500,000 but was [643455], count [653]
Expected diff < 500,000 but was [508447], count [670]
Expected diff < 500,000 but was [960839], count [671]
Expected diff < 500,000 but was [954918], count [683]
Expected diff < 500,000 but was [601215], count [749]
Expected diff < 500,000 but was [561345], count [752]
Expected diff < 500,000 but was [688062], count [935]
Expected diff < 500,000 but was [1405730], count [937]
Expected diff < 500,000 but was [1414415], count [938]
Expected diff < 500,000 but was [1284935], count [941]
Expected diff < 500,000 but was [516737], count [995]
Expected diff < 500,000 but was [587398], count [1067]
Expected diff < 500,000 but was [946233], count [1079]
Expected diff < 500,000 but was [5403041], count [1114]
Expected diff < 500,000 but was [1181114], count [1115]
Expected diff < 500,000 but was [554239], count [1118]
Expected diff < 500,000 but was [1437706], count [1121]
Expected diff < 500,000 but was [577925], count [1240]
Expected diff < 500,000 but was [1226115], count [1241]
Expected diff < 500,000 but was [2194850], count [1285]
Expected diff < 500,000 but was [522264], count [1292]
Expected diff < 500,000 but was [845964], count [1328]
Expected diff < 500,000 but was [3652294], count [1331]
Expected diff < 500,000 but was [727538], count [1343]
Expected diff < 500,000 but was [809252], count [1349]
Expected diff < 500,000 but was [1597188], count [1393]
Expected diff < 500,000 but was [525816], count [1394]
08-Mar-2017 14:15:09.251 INFO [main] org.apache.coyote.AbstractProtocol.pause Pausing ProtocolHandler
["http-nio-127.0.0.1-auto-1-54783"]
08-Mar-2017 14:15:09.266 INFO [main] org.apache.catalina.core.StandardService.stopInternal
Stopping service Tomcat

Then I was able to run the test also in IDEA, by importing Eclipse project and modifying the
libraries.

I changed the big message size to 16384 bytes and small size to 1904 bytes (the most common
problem that we have seen).
So need to add the following to servlet configuration:

	<context-param>
		<param-name>org.apache.tomcat.websocket.binaryBufferSize</param-name>
		<param-value>16384</param-value>
	</context-param>
	<context-param>
		<param-name>org.apache.tomcat.websocket.textBufferSize</param-name>
		<param-value>16384</param-value>
	</context-param>

Like this:

        ctx.addParameter(Constants.
                BINARY_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM, "16384");
        ctx.addParameter(org.apache.tomcat.websocket.server.Constants.
                TEXT_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM, "16384");
        wsContainer.setDefaultMaxBinaryMessageBufferSize(16384);
        wsContainer.setDefaultMaxTextMessageBufferSize(16384);

Then I changed the test logic so that there are only 2 messages, one big and one small, but
it fails randomly like the original test.
Also added test for maximum delay for the big message:

                    } else if (diff > 60000000) {
                        System.out.println("Expected diff < 60ms but was [" + diff + "],
count [" + count + "]");
                        fail = true;
                    }

Sent Hello message, waiting for data
Expected diff < 500,000 but was [718458], count [9]
Expected diff < 500,000 but was [1224142], count [17]
Expected diff < 500,000 but was [1102952], count [19]
Expected diff < 500,000 but was [663587], count [21]
Expected diff < 500,000 but was [537659], count [43]
Expected diff < 500,000 but was [3158452], count [73]
Expected diff < 500,000 but was [582662], count [75]
Expected diff < 500,000 but was [825437], count [121]
Expected diff < 500,000 but was [605953], count [129]
Expected diff < 500,000 but was [1092293], count [143]
Expected diff < 500,000 but was [552660], count [145]
Expected diff < 500,000 but was [7926329], count [177]
Expected diff < 500,000 but was [507657], count [189]
Expected diff < 500,000 but was [913468], count [193]
Expected diff < 500,000 but was [560160], count [229]
Expected diff < 500,000 but was [1748774], count [319]
Expected diff < 500,000 but was [529764], count [321]
Expected diff < 500,000 but was [692009], count [381]
Expected diff < 500,000 but was [556213], count [389]
Expected diff < 500,000 but was [2727772], count [403]
Expected diff < 500,000 but was [937154], count [469]
Expected diff < 500,000 but was [544370], count [513]
Expected diff < 500,000 but was [1018473], count [575]
Expected diff < 500,000 but was [518711], count [601]
Expected diff < 500,000 but was [885441], count [613]
Expected diff < 500,000 but was [2075633], count [645]
Expected diff < 500,000 but was [606347], count [715]
Expected diff < 500,000 but was [561345], count [721]
Expected diff < 500,000 but was [947812], count [889]
Expected diff < 500,000 but was [513974], count [923]
Expected diff < 60ms but was [65475211], count [960]
Expected diff > 40ms but was [35346555], count [962]

So the delay is at most 2 milliseconds, which is acceptable.

I think that you understood my problem quite well. It seems that the problem can't be reproduced
in this test.
I also tried adding sendPing() there but it did not have any effect.
We do not use compression extension.
Thanks for the test! :-)

-Harri

-----Original Message-----
From: Mark Thomas [mailto:markt@apache.org] 
Sent: 7. maaliskuuta 2017 23:58
To: Tomcat Users List <users@tomcat.apache.org>
Subject: Re: Tomcat WebSocket does not always send asynchronous messages

On 07/03/17 14:55, Mark Thomas wrote:
> On 07/03/17 11:03, Mark Thomas wrote:
>> On 07/03/17 08:28, Pesonen, Harri wrote:
>>> Hello, we have a problem that Tomcat WebSocket does not always send asynchronous
messages. This problem is random, and it has been reproduced in Tomcat 8.5.6 and 8.5.11. Synchronized
operations work fine, and also the asynchronous operations work except in one special case.
When there is a big message that we want to send to client, we split it into 16 kB packets
for technical reasons, and then we send them very quickly after each other using
>>>
>>> /**
>>> * Initiates the asynchronous transmission of a binary message. This method returns
before the message
>>> * is transmitted. Developers provide a callback to be notified when the message
has been
>>> * transmitted. Errors in transmission are given to the developer in the SendResult
object.
>>> *
>>> * @param data       the data being sent, must not be {@code null}.
>>> * @param handler the handler that will be notified of progress, must not be {@code
null}.
>>> * @throws IllegalArgumentException if either the data or the handler are {@code
null}.
>>> */
>>> void sendBinary(ByteBuffer data, SendHandler handler);
>>>
>>> Because there can be only one ongoing write to socket, we use Semaphore that
is released on the SendHandler callback:
>>>
>>> public void onResult(javax.websocket.SendResult result) {
>>>     semaphore.release();
>>>
>>> So the code to send is actually:
>>>
>>> semaphore.acquireUninterruptibly();
>>> async.sendBinary(buf, asyncHandler);
>>>
>>> This works fine in most cases. But when we send one 16 kB packet and then immediately
one smaller packet (4 kB), then randomly the smaller packet is not actually sent, but only
after we call
>>>
>>> async.sendPing(new byte[0])
>>>
>>> in another thread. sendPing() is called every 20 seconds to keep the WebSocket
connection alive. This means that the last packet gets extra delay on client, which varies
between 0 - 20 seconds.
>>>
>>> We have an easy workaround to the problem. If we call flushBatch() after each
sendBinary(), then it works great, but this means that the sending is not actually asynchronous,
because flushBatch() is synchronous.
>>> Also we should not be forced to call flushBatch(), because we are not enabling
batching. Instead we make sure that it is disabled:
>>>
>>> if (async.getBatchingAllowed()) {
>>>     async.setBatchingAllowed(false);
>>>
>>> So the working code is:
>>>
>>> semaphore.acquireUninterruptibly();
>>> async.sendBinary(buf, asyncHandler);
>>> async.flushBatch();
>>>
>>> Normally the code works fine without flushBatch(), if there is delay between
the messages, but when we send the messages right after each other, then the last small message
is not always sent immediately.
>>> I looked at the Apache WebSocket code, but it was not clear to me what is happening
there.
>>> Any ideas what is going on here? Any ideas how I could troubleshoot this more?
>>
>> Thanks for providing such a clear description of the problem you are seeing.
>>
>> It sounds like there is a race condition somewhere in the WebSocket
>> code. With the detail you have provided, I think there is a reasonable
>> chance of finding via code inspection.
> 
> Some follow-up questions to help narrow the search.
> 
> This is server side, correct?
> 
> Are you using the compression extension? If yes, do you see the problem
> without it?
> 
> When you say "we split it into 16 kB packets" do you mean you split it
> into multiple WebSocket messages?
> 
> If you insert a short delay before sending the final 4kB does that
> reduce the frequency of the problem?

I've added a (disabled by default) test case to explore the issue
described based on my understanding. It passes for me (ignoring what
look like GC introduced delays) with NIO.

http://svn.apache.org/viewvc?rev=1785893&view=rev

What would be really helpful would be if you could use this as a basis
for providing a test case that demonstrates the problem you are seeing.

Thanks,

Mark

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@tomcat.apache.org
For additional commands, e-mail: users-help@tomcat.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@tomcat.apache.org
For additional commands, e-mail: users-help@tomcat.apache.org


Mime
View raw message