Return-Path: X-Original-To: apmail-tomcat-dev-archive@www.apache.org Delivered-To: apmail-tomcat-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4B7D4DAB8 for ; Thu, 13 Dec 2012 22:56:31 +0000 (UTC) Received: (qmail 74771 invoked by uid 500); 13 Dec 2012 22:56:30 -0000 Delivered-To: apmail-tomcat-dev-archive@tomcat.apache.org Received: (qmail 74699 invoked by uid 500); 13 Dec 2012 22:56:30 -0000 Mailing-List: contact dev-help@tomcat.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: "Tomcat Developers List" Delivered-To: mailing list dev@tomcat.apache.org Received: (qmail 74690 invoked by uid 99); 13 Dec 2012 22:56:30 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Dec 2012 22:56:30 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Dec 2012 22:56:28 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9EFBC2388AB8 for ; Thu, 13 Dec 2012 22:56:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1421602 - in /tomcat/trunk/java: javax/websocket/RemoteEndpoint.java org/apache/tomcat/websocket/WsRemoteEndpoint.java Date: Thu, 13 Dec 2012 22:56:08 -0000 To: dev@tomcat.apache.org From: markt@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121213225608.9EFBC2388AB8@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: markt Date: Thu Dec 13 22:56:06 2012 New Revision: 1421602 URL: http://svn.apache.org/viewvc?rev=1421602&view=rev Log: WebSocket 1.0 implementation part 15 of many Implement enough of the send message code that that WebSocket example works again using the new annotation endpoint Modified: tomcat/trunk/java/javax/websocket/RemoteEndpoint.java tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java Modified: tomcat/trunk/java/javax/websocket/RemoteEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/javax/websocket/RemoteEndpoint.java?rev=1421602&r1=1421601&r2=1421602&view=diff ============================================================================== --- tomcat/trunk/java/javax/websocket/RemoteEndpoint.java (original) +++ tomcat/trunk/java/javax/websocket/RemoteEndpoint.java Thu Dec 13 22:56:06 2012 @@ -25,8 +25,18 @@ import java.util.concurrent.Future; public interface RemoteEndpoint { + /** + * Send the message, blocking until the message is sent. + * @param text The text message to send. + * @throws IOException + */ void sendString(String text) throws IOException; + /** + * Send the message, blocking until the message is sent. + * @param data The binary message to send + * @throws IOException + */ void sendBytes(ByteBuffer data) throws IOException; void sendPartialString(String fragment, boolean isLast) throws IOException; Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java?rev=1421602&r1=1421601&r2=1421602&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpoint.java Thu Dec 13 22:56:06 2012 @@ -20,6 +20,12 @@ import java.io.IOException; import java.io.OutputStream; import java.io.Writer; import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CoderResult; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Future; import javax.servlet.ServletOutputStream; @@ -31,38 +37,83 @@ import javax.websocket.SendResult; public class WsRemoteEndpoint implements RemoteEndpoint { private final ServletOutputStream sos; + // Max length for outgoing WebSocket frame header is 10 bytes + private final ByteBuffer header = ByteBuffer.allocate(10); + + private final ByteBuffer textToByte = ByteBuffer.allocate(8192); + private final CharsetEncoder encoder = Charset.forName("UTF8").newEncoder(); + private volatile Boolean isText = null; + private volatile CyclicBarrier writeBarrier = new CyclicBarrier(2); + public WsRemoteEndpoint(ServletOutputStream sos) { this.sos = sos; } - public void onWritePossible() { - // TODO - } @Override public void sendString(String text) throws IOException { - // TODO Auto-generated method stub + if (isText != null) { + // Another message is being sent using fragments + // TODO i18n + throw new IllegalStateException(); + } + sendPartialString(text, true); } @Override public void sendBytes(ByteBuffer data) throws IOException { - // TODO Auto-generated method stub + if (isText != null) { + // Another message is being sent using fragments + // TODO i18n + throw new IllegalStateException(); + } + sendPartialBytes(data, true); } @Override public void sendPartialString(String fragment, boolean isLast) throws IOException { - // TODO Auto-generated method stub + + if (isText != null && !isText.booleanValue()) { + // Can't write a text fragment in the middle of a binary message + // TODO i18n + throw new IllegalStateException(); + } + + boolean first = (isText == null); + encoder.reset(); + textToByte.clear(); + CharBuffer cb = CharBuffer.wrap(fragment); + CoderResult cr = encoder.encode(cb, textToByte, true); + while (cr.isOverflow()) { + sendMessage(Constants.OPCODE_TEXT, textToByte, first, false); + first = false; + } + sendMessage(Constants.OPCODE_TEXT, textToByte, first, isLast); + if (!isLast) { + isText = Boolean.FALSE; + } } @Override public void sendPartialBytes(ByteBuffer partialByte, boolean isLast) throws IOException { - // TODO Auto-generated method stub + + if (isText != null && isText.booleanValue()) { + // Can't write a binary fragment in the middle of a text message + // TODO i18n + throw new IllegalStateException(); + } + + boolean first = (isText == null); + sendMessage(Constants.OPCODE_BINARY, partialByte, first, isLast); + if (!isLast) { + isText = Boolean.FALSE; + } } @@ -127,12 +178,92 @@ public class WsRemoteEndpoint implements @Override public void sendPing(ByteBuffer applicationData) { - // TODO Auto-generated method stub + sendMessage(Constants.OPCODE_PING, applicationData, true, true); } @Override public void sendPong(ByteBuffer applicationData) { - // TODO Auto-generated method stub + sendMessage(Constants.OPCODE_PONG, applicationData, true, true); + } + + + public void onWritePossible() { + try { + writeBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + + private void sendMessage(byte opCode, ByteBuffer message, + boolean isFirstFragment, boolean isLastFragment) { + // Clear header, ready for new message + header.clear(); + byte first = 0; + + if (isLastFragment) { + // Set the fin bit + first = -128; + } + + if (isFirstFragment) { + // This is the first fragment of this message + first = (byte) (first + opCode); + } + // If not the first fragment, it is a continuation with opCode of zero + + message.flip(); + header.put(first); + + // Next write the length + if (message.limit() < 126) { + header.put((byte) message.limit()); + } else if (message.limit() < 65536) { + header.put((byte) 126); + header.put((byte) (message.limit() >>> 8)); + header.put((byte) (message.limit() & 0xFF)); + } else { + // Will never be more than 2^31-1 + header.put((byte) 127); + header.put((byte) 0); + header.put((byte) 0); + header.put((byte) 0); + header.put((byte) 0); + header.put((byte) (message.limit() >>> 24)); + header.put((byte) (message.limit() >>> 16)); + header.put((byte) (message.limit() >>> 8)); + header.put((byte) (message.limit() & 0xFF)); + } + header.flip(); + + doBlockingWrite(header); + doBlockingWrite(message); + try { + sos.flush(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + + private void doBlockingWrite(ByteBuffer data) { + if (!sos.canWrite()) { + try { + writeBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + try { + sos.write(data.array(), data.arrayOffset(), data.limit()); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org For additional commands, e-mail: dev-help@tomcat.apache.org