Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 80AB7625D for ; Mon, 25 Jul 2011 11:09:28 +0000 (UTC) Received: (qmail 91711 invoked by uid 500); 25 Jul 2011 11:09:28 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 91618 invoked by uid 500); 25 Jul 2011 11:09:25 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 91609 invoked by uid 99); 25 Jul 2011 11:09:23 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Jul 2011 11:09:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Jul 2011 11:09:17 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 746D923888EA for ; Mon, 25 Jul 2011 11:08:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1150630 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/nio/ main/java/org/apache/activemq/transport/stomp/ main/java/org/apache/activemq/transport/tcp/ main/resources/META-INF/services/org/apache/activemq/t... Date: Mon, 25 Jul 2011 11:08:54 -0000 To: commits@activemq.apache.org From: dejanb@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110725110855.746D923888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dejanb Date: Mon Jul 25 11:08:52 2011 New Revision: 1150630 URL: http://svn.apache.org/viewvc?rev=1150630&view=rev Log: https://issues.apache.org/jira/browse/AMQ-2583 - stomp+nio+ssl initial implementation Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java?rev=1150630&r1=1150629&r2=1150630&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java Mon Jul 25 11:08:52 2011 @@ -128,6 +128,9 @@ public class NIOOutputStream extends Out */ public void close() throws IOException { super.close(); + if (engine != null) { + engine.closeOutbound(); + } closed = true; } @@ -159,6 +162,7 @@ public class NIOOutputStream extends Out } else { plain = data; } + int remaining = plain.remaining(); int lastRemaining = remaining - 1; long delay = 1; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java?rev=1150630&r1=1150629&r2=1150630&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java Mon Jul 25 11:08:52 2011 @@ -28,6 +28,7 @@ import javax.net.SocketFactory; import javax.net.ssl.*; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.EOFException; import java.io.IOException; import java.net.Socket; import java.net.URI; @@ -36,18 +37,18 @@ import java.nio.ByteBuffer; public class NIOSSLTransport extends NIOTransport { - private boolean needClientAuth; - private boolean wantClientAuth; - private String[] enabledCipherSuites; + protected boolean needClientAuth; + protected boolean wantClientAuth; + protected String[] enabledCipherSuites; protected SSLContext sslContext; protected SSLEngine sslEngine; protected SSLSession sslSession; - boolean handshakeInProgress = false; - SSLEngineResult.Status status = null; - SSLEngineResult.HandshakeStatus handshakeStatus = null; + protected boolean handshakeInProgress = false; + protected SSLEngineResult.Status status = null; + protected SSLEngineResult.HandshakeStatus handshakeStatus = null; public NIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { super(wireFormat, socketFactory, remoteLocation, localLocation); @@ -90,11 +91,8 @@ public class NIOSSLTransport extends NIO outputStream.setEngine(sslEngine); this.dataOut = new DataOutputStream(outputStream); this.buffOut = outputStream; - sslEngine.beginHandshake(); handshakeStatus = sslEngine.getHandshakeStatus(); - - doHandshake(); } catch (Exception e) { @@ -125,8 +123,6 @@ public class NIOSSLTransport extends NIO } } - - protected void serviceRead() { try { if (handshakeInProgress) { @@ -136,62 +132,75 @@ public class NIOSSLTransport extends NIO ByteBuffer plain = ByteBuffer.allocate(sslSession.getApplicationBufferSize()); plain.position(plain.limit()); - while (true) { - if (nextFrameSize == -1) { - if (!plain.hasRemaining()) { - plain.clear(); - int readCount = secureRead(plain); - if (readCount == 0) - break; - } - nextFrameSize = plain.getInt(); - if (wireFormat instanceof OpenWireFormat) { - long maxFrameSize = ((OpenWireFormat)wireFormat).getMaxFrameSize(); - if (nextFrameSize > maxFrameSize) { - throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB"); - } - } - currentBuffer = ByteBuffer.allocate(nextFrameSize + 4); - currentBuffer.putInt(nextFrameSize); - if (currentBuffer.hasRemaining()) { - if (currentBuffer.remaining() >= plain.remaining()) { - currentBuffer.put(plain); - } else { - byte[] fill = new byte[currentBuffer.remaining()]; - plain.get(fill); - currentBuffer.put(fill); - } - } + while(true) { + if (!plain.hasRemaining()) { + + plain.clear(); + int readCount = secureRead(plain); - if (currentBuffer.hasRemaining()) { - continue; - } else { - currentBuffer.flip(); - Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer))); - doConsume((Command) command); - nextFrameSize = -1; + if (readCount == 0) + break; + + // channel is closed, cleanup + if (readCount== -1) { + onException(new EOFException()); + selection.close(); + break; } } - } + processCommand(plain); + + } } catch (IOException e) { onException(e); } catch (Throwable e) { onException(IOExceptionSupport.create(e)); } + } + protected void processCommand(ByteBuffer plain) throws Exception { + nextFrameSize = plain.getInt(); + if (wireFormat instanceof OpenWireFormat) { + long maxFrameSize = ((OpenWireFormat) wireFormat).getMaxFrameSize(); + if (nextFrameSize > maxFrameSize) { + throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB"); + } + } + currentBuffer = ByteBuffer.allocate(nextFrameSize + 4); + currentBuffer.putInt(nextFrameSize); + if (currentBuffer.hasRemaining()) { + if (currentBuffer.remaining() >= plain.remaining()) { + currentBuffer.put(plain); + } else { + byte[] fill = new byte[currentBuffer.remaining()]; + plain.get(fill); + currentBuffer.put(fill); + } + } + + if (currentBuffer.hasRemaining()) { + return; + } else { + currentBuffer.flip(); + Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer))); + doConsume((Command) command); + nextFrameSize = -1; + } } + protected int secureRead(ByteBuffer plain) throws Exception { + if (!(inputBuffer.position() != 0 && inputBuffer.hasRemaining())) { + int bytesRead = channel.read(inputBuffer); - private int secureRead(ByteBuffer plain) throws Exception { - int bytesRead = channel.read(inputBuffer); - if (bytesRead == -1) { - sslEngine.closeInbound(); - if (inputBuffer.position() == 0 || - status == SSLEngineResult.Status.BUFFER_UNDERFLOW) { - return -1; + if (bytesRead == -1) { + sslEngine.closeInbound(); + if (inputBuffer.position() == 0 || + status == SSLEngineResult.Status.BUFFER_UNDERFLOW) { + return -1; + } } } @@ -206,12 +215,13 @@ public class NIOSSLTransport extends NIO res.bytesProduced() == 0); if (res.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED) { - finishHandshake(); + finishHandshake(); } status = res.getStatus(); handshakeStatus = res.getHandshakeStatus(); + //TODO deal with BUFFER_OVERFLOW if (status == SSLEngineResult.Status.CLOSED) { @@ -253,6 +263,7 @@ public class NIOSSLTransport extends NIO protected void doStop(ServiceStopper stopper) throws Exception { if (channel != null) { channel.close(); + channel = null; } super.doStop(stopper); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?rev=1150630&r1=1150629&r2=1150630&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java Mon Jul 25 11:08:52 2011 @@ -161,6 +161,7 @@ public class NIOTransport extends TcpTra protected void doStop(ServiceStopper stopper) throws Exception { if (selection != null) { selection.close(); + selection = null; } super.doStop(stopper); } Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java?rev=1150630&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java Mon Jul 25 11:08:52 2011 @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.util.ByteArrayOutputStream; +import org.apache.activemq.util.DataByteArrayInputStream; +import org.apache.activemq.wireformat.WireFormat; + +import java.io.ByteArrayInputStream; +import java.util.HashMap; + +public class StompCodec { + + TcpTransport transport; + + ByteArrayOutputStream currentCommand = new ByteArrayOutputStream(); + boolean processedHeaders = false; + String action; + HashMap headers; + int contentLength = -1; + int readLength = 0; + int previousByte = -1; + + public StompCodec(TcpTransport transport) { + this.transport = transport; + } + + public void parse(ByteArrayInputStream input, int readSize) throws Exception { + int i = 0; + int b; + while(i++ < readSize) { + b = input.read(); + // skip repeating nulls + if (!processedHeaders && previousByte == 0 && b == 0) { + continue; + } + + if (!processedHeaders) { + currentCommand.write(b); + // end of headers section, parse action and header + if (previousByte == '\n' && b == '\n') { + if (transport.getWireFormat() instanceof StompWireFormat) { + DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray()); + action = ((StompWireFormat)transport.getWireFormat()).parseAction(data); + headers = ((StompWireFormat)transport.getWireFormat()).parseHeaders(data); + String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH); + if (contentLengthHeader != null) { + contentLength = ((StompWireFormat)transport.getWireFormat()).parseContentLength(contentLengthHeader); + } else { + contentLength = -1; + } + } + processedHeaders = true; + currentCommand.reset(); + } + } else { + + if (contentLength == -1) { + // end of command reached, unmarshal + if (b == 0) { + processCommand(); + } else { + currentCommand.write(b); + } + } else { + // read desired content length + if (readLength++ == contentLength) { + processCommand(); + readLength = 0; + } else { + currentCommand.write(b); + } + } + } + + previousByte = b; + } + } + + protected void processCommand() throws Exception { + StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray()); + transport.doConsume(frame); + processedHeaders = false; + currentCommand.reset(); + contentLength = -1; + } +} Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java?rev=1150630&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java Mon Jul 25 11:08:52 2011 @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import org.apache.activemq.transport.nio.NIOSSLTransport; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.wireformat.WireFormat; + +import javax.net.SocketFactory; +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; + +public class StompNIOSSLTransport extends NIOSSLTransport { + + StompCodec codec; + + public StompNIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { + super(wireFormat, socketFactory, remoteLocation, localLocation); + } + + public StompNIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException { + super(wireFormat, socket); + } + + @Override + protected void initializeStreams() throws IOException { + codec = new StompCodec(this); + super.initializeStreams(); + if (inputBuffer.position() != 0 && inputBuffer.hasRemaining()) { + serviceRead(); + } + } + + @Override + protected void processCommand(ByteBuffer plain) throws Exception { + byte[] fill = new byte[plain.remaining()]; + plain.get(fill); + ByteArrayInputStream input = new ByteArrayInputStream(fill); + codec.parse(input, fill.length); + } + +} Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java?rev=1150630&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java Mon Jul 25 11:08:52 2011 @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import org.apache.activemq.broker.SslContext; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransportServer; +import org.apache.activemq.wireformat.WireFormat; + +import javax.net.ServerSocketFactory; +import javax.net.SocketFactory; +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; + +public class StompNIOSSLTransportFactory extends StompNIOTransportFactory { + + SSLContext context; + + @Override + protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + return new TcpTransportServer(this, location, serverSocketFactory) { + protected Transport createTransport(Socket socket, WireFormat format) throws IOException { + StompNIOSSLTransport transport = new StompNIOSSLTransport(format, socket); + if (context != null) { + transport.setSslContext(context); + } + return transport; + } + }; + } + + @Override + protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { + return new StompNIOSSLTransport(wf, socketFactory, location, localLocation); + } + + @Override + public TransportServer doBind(URI location) throws IOException { + if (SslContext.getCurrentSslContext() != null) { + try { + context = SslContext.getCurrentSslContext().getSSLContext(); + } catch (Exception e) { + throw new IOException(e); + } + } + return super.doBind(location); + } + +} Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=1150630&r1=1150629&r2=1150630&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Mon Jul 25 11:08:52 2011 @@ -54,13 +54,7 @@ public class StompNIOTransport extends T private SelectorSelection selection; private ByteBuffer inputBuffer; - ByteArrayOutputStream currentCommand = new ByteArrayOutputStream(); - boolean processedHeaders = false; - String action; - HashMap headers; - int contentLength = -1; - int readLength = 0; - int previousByte = -1; + StompCodec codec; public StompNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { super(wireFormat, socketFactory, remoteLocation, localLocation); @@ -93,6 +87,7 @@ public class StompNIOTransport extends T NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024); this.dataOut = new DataOutputStream(outPutStream); this.buffOut = outPutStream; + codec = new StompCodec(this); } private void serviceRead() { @@ -114,57 +109,9 @@ public class StompNIOTransport extends T inputBuffer.flip(); - int b; ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array()); + codec.parse(input, readSize); - int i = 0; - while(i++ < readSize) { - b = input.read(); - // skip repeating nulls - if (!processedHeaders && previousByte == 0 && b == 0) { - continue; - } - - if (!processedHeaders) { - currentCommand.write(b); - // end of headers section, parse action and header - if (previousByte == '\n' && b == '\n') { - if (wireFormat instanceof StompWireFormat) { - DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray()); - action = ((StompWireFormat)wireFormat).parseAction(data); - headers = ((StompWireFormat)wireFormat).parseHeaders(data); - String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH); - if (contentLengthHeader != null) { - contentLength = ((StompWireFormat)wireFormat).parseContentLength(contentLengthHeader); - } else { - contentLength = -1; - } - } - processedHeaders = true; - currentCommand.reset(); - } - } else { - - if (contentLength == -1) { - // end of command reached, unmarshal - if (b == 0) { - processCommand(); - } else { - currentCommand.write(b); - } - } else { - // read desired content length - if (readLength++ == contentLength) { - processCommand(); - readLength = 0; - } else { - currentCommand.write(b); - } - } - } - - previousByte = b; - } // clear the buffer inputBuffer.clear(); @@ -176,14 +123,6 @@ public class StompNIOTransport extends T } } - private void processCommand() throws Exception { - StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray()); - doConsume(frame); - processedHeaders = false; - currentCommand.reset(); - contentLength = -1; - } - protected void doStart() throws Exception { connect(); selection.setInterestOps(SelectionKey.OP_READ); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=1150630&r1=1150629&r2=1150630&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Mon Jul 25 11:08:52 2011 @@ -687,4 +687,8 @@ public class TcpTransport extends Transp this.typeOfServiceChosen = false; return true; } + + public WireFormat getWireFormat() { + return wireFormat; + } } Added: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp%2Bnio%2Bssl?rev=1150630&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl (added) +++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio+ssl Mon Jul 25 11:08:52 2011 @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.stomp.StompNIOSSLTransportFactory \ No newline at end of file Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java?rev=1150630&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java Mon Jul 25 11:08:52 2011 @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import javax.net.SocketFactory; +import javax.net.ssl.SSLSocketFactory; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; + +public class StompNIOSSLTest extends StompTest { + + protected void setUp() throws Exception { + bindAddress = "stomp+nio+ssl://localhost:61613"; + confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml"; + System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore"); + System.setProperty("javax.net.ssl.trustStorePassword", "password"); + System.setProperty("javax.net.ssl.trustStoreType", "jks"); + System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore"); + System.setProperty("javax.net.ssl.keyStorePassword", "password"); + System.setProperty("javax.net.ssl.keyStoreType", "jks"); + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + } + + protected Socket createSocket(URI connectUri) throws IOException { + SocketFactory factory = SSLSocketFactory.getDefault(); + return factory.createSocket("127.0.0.1", connectUri.getPort()); + } + +} Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1150630&r1=1150629&r2=1150630&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Mon Jul 25 11:08:52 2011 @@ -111,6 +111,7 @@ public class StompTest extends Combinati } broker = BrokerFactory.createBroker(new URI(confUri)); broker.start(); + broker.waitUntilStarted(); stompConnect(); @@ -143,6 +144,7 @@ public class StompTest extends Combinati // Some tests explicitly disconnect from stomp so can ignore } finally { broker.stop(); + broker.waitUntilStopped(); } } Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml?rev=1150630&r1=1150629&r2=1150630&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml (original) +++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml Mon Jul 25 11:08:52 2011 @@ -69,6 +69,7 @@ +