Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 51424 invoked from network); 20 Dec 2007 14:38:09 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 20 Dec 2007 14:38:09 -0000 Received: (qmail 74514 invoked by uid 500); 20 Dec 2007 14:37:58 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 74496 invoked by uid 500); 20 Dec 2007 14:37:58 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 74487 invoked by uid 99); 20 Dec 2007 14:37:58 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Dec 2007 06:37:58 -0800 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Dec 2007 14:37:53 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 3DFE21A9832; Thu, 20 Dec 2007 06:37:44 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r605944 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/ test/java/org/apache/activemq/transport/stomp/ Date: Thu, 20 Dec 2007 14:37:43 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071220143744.3DFE21A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Thu Dec 20 06:37:41 2007 New Revision: 605944 URL: http://svn.apache.org/viewvc?rev=605944&view=rev Log: Applied Dejan's patch on AMQ-1272 with some small tweaks. Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=605944&r1=605943&r2=605944&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Thu Dec 20 06:37:41 2007 @@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import javax.jms.Destination; import javax.jms.JMSException; import org.apache.activemq.command.ActiveMQDestination; @@ -33,11 +32,13 @@ import org.apache.activemq.command.ActiveMQTempQueue; import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConnectionError; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; @@ -51,6 +52,7 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.util.ByteArrayOutputStream; +import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.LongSequenceGenerator; @@ -94,17 +96,23 @@ } } - protected ResponseHandler createResponseHandler(StompFrame command) { + protected ResponseHandler createResponseHandler(final StompFrame command) { final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); - // A response may not be needed. if (receiptId != null) { return new ResponseHandler() { public void onResponse(ProtocolConverter converter, Response response) throws IOException { - StompFrame sc = new StompFrame(); - sc.setAction(Stomp.Responses.RECEIPT); - sc.setHeaders(new HashMap(1)); - sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); - transportFilter.sendToStomp(sc); + if (response.isException()) { + // Generally a command can fail.. but that does not invalidate the connection. + // We report back the failure but we don't close the connection. + Throwable exception = ((ExceptionResponse)response).getException(); + handleException(exception, command); + } else { + StompFrame sc = new StompFrame(); + sc.setAction(Stomp.Responses.RECEIPT); + sc.setHeaders(new HashMap(1)); + sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); + transportFilter.sendToStomp(sc); + } } }; } @@ -160,28 +168,33 @@ } } catch (ProtocolException e) { - - // Let the stomp client know about any protocol errors. - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8")); - e.printStackTrace(stream); - stream.close(); - - HashMap headers = new HashMap(); - headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage()); - - final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); - if (receiptId != null) { - headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); + handleException(e, command); + // Some protocol errors can cause the connection to get closed. + if( e.isFatal() ) { + getTransportFilter().onException(e); } + } + } + + protected void handleException(Throwable exception, StompFrame command) throws IOException { + // Let the stomp client know about any protocol errors. + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8")); + exception.printStackTrace(stream); + stream.close(); - StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray()); - sendToStomp(errorMessage); + HashMap headers = new HashMap(); + headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage()); - if (e.isFatal()) { - getTransportFilter().onException(e); - } + if (command != null) { + final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); + if (receiptId != null) { + headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId); + } } + + StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray()); + sendToStomp(errorMessage); } protected void onStompSend(StompFrame command) throws IOException, JMSException { @@ -393,7 +406,7 @@ throw new ProtocolException("No subscription matched."); } - protected void onStompConnect(StompFrame command) throws ProtocolException { + protected void onStompConnect(final StompFrame command) throws ProtocolException { if (connected.get()) { throw new ProtocolException("Allready connected."); @@ -424,13 +437,28 @@ sendToActiveMQ(connectionInfo, new ResponseHandler() { public void onResponse(ProtocolConverter converter, Response response) throws IOException { + if (response.isException()) { + // If the connection attempt fails we close the socket. + Throwable exception = ((ExceptionResponse)response).getException(); + handleException(exception, command); + getTransportFilter().onException(IOExceptionSupport.create(exception)); + return; + } + final SessionInfo sessionInfo = new SessionInfo(sessionId); sendToActiveMQ(sessionInfo, null); final ProducerInfo producerInfo = new ProducerInfo(producerId); sendToActiveMQ(producerInfo, new ResponseHandler() { public void onResponse(ProtocolConverter converter, Response response) throws IOException { - + + if (response.isException()) { + // If the connection attempt fails we close the socket. + Throwable exception = ((ExceptionResponse)response).getException(); + handleException(exception, command); + getTransportFilter().onException(IOExceptionSupport.create(exception)); + } + connected.set(true); HashMap responseHeaders = new HashMap(); @@ -483,8 +511,13 @@ ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); if (rh != null) { rh.onResponse(this, response); + } else { + // Pass down any unexpected errors. Should this close the connection? + if (response.isException()) { + Throwable exception = ((ExceptionResponse)response).getException(); + handleException(exception, null); + } } - } else if (command.isMessageDispatch()) { MessageDispatch md = (MessageDispatch)command; @@ -492,6 +525,10 @@ if (sub != null) { sub.onMessageDispatch(md); } + } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { + // Pass down any unexpected async errors. Should this close the connection? + Throwable exception = ((ConnectionError)command).getException(); + handleException(exception, null); } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=605944&r1=605943&r2=605944&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompConnection.java Thu Dec 20 06:37:41 2007 @@ -30,9 +30,13 @@ private Socket stompSocket; private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream(); - + public void open(String host, int port) throws IOException, UnknownHostException { - stompSocket = new Socket(host, port); + open(new Socket(host, port)); + } + + public void open(Socket socket) { + stompSocket = socket; } public void close() throws IOException { @@ -75,5 +79,13 @@ } } } + + public Socket getStompSocket() { + return stompSocket; + } + + public void setStompSocket(Socket stompSocket) { + this.stompSocket = stompSocket; + } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=605944&r1=605943&r2=605944&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Thu Dec 20 06:37:41 2007 @@ -35,49 +35,49 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.activemq.transport.reliable.UnreliableUdpTransportTest; +import org.apache.activemq.security.AuthorizationPlugin; +import org.apache.activemq.security.SimpleSecurityBrokerSystemTest; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class StompTest extends CombinationTestSupport { private static final Log LOG = LogFactory.getLog(StompTest.class); - protected String bindAddress = "stomp://localhost:0"; + protected String bindAddress = "stomp://localhost:61613"; + protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml"; private BrokerService broker; - private TransportConnector connector; private StompConnection stompConnection = new StompConnection(); private Connection connection; private Session session; private ActiveMQQueue queue; - + protected void setUp() throws Exception { - broker = new BrokerService(); - broker.setPersistent(false); - - connector = broker.addConnector(bindAddress); + broker = BrokerFactory.createBroker(new URI(confUri)); broker.start(); stompConnect(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); - connection = cf.createConnection(); + connection = cf.createConnection("system", "manager"); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); queue = new ActiveMQQueue(getQueueName()); connection.start(); } private void stompConnect() throws IOException, URISyntaxException, UnknownHostException { - URI connectUri = connector.getConnectUri(); - stompConnection.open("127.0.0.1", connectUri.getPort()); + URI connectUri = new URI(bindAddress); + stompConnection.open(createSocket(connectUri)); } protected Socket createSocket(URI connectUri) throws IOException { - return new Socket(); + return new Socket("127.0.0.1", connectUri.getPort()); } protected String getQueueName() { @@ -117,7 +117,7 @@ public void testConnect() throws Exception { - String connectFrame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL; + String connectFrame = "CONNECT\n" + "login: system\n" + "passcode: manager\n" + "request-id: 1\n" + "\n" + Stomp.NULL; stompConnection.sendFrame(connectFrame); String f = stompConnection.receiveFrame(); @@ -130,7 +130,7 @@ MessageConsumer consumer = session.createConsumer(queue); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); @@ -155,7 +155,7 @@ MessageConsumer consumer = session.createConsumer(queue); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); @@ -174,7 +174,7 @@ MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'"); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); @@ -195,7 +195,7 @@ MessageConsumer consumer = session.createConsumer(queue); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); @@ -222,7 +222,7 @@ public void testSubscribeWithAutoAck() throws Exception { - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); @@ -242,7 +242,7 @@ public void testSubscribeWithAutoAckAndBytesMessage() throws Exception { - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); @@ -271,7 +271,7 @@ public void testSubscribeWithMessageSentWithProperties() throws Exception { - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); @@ -305,7 +305,7 @@ int ctr = 10; String[] data = new String[ctr]; - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); @@ -343,7 +343,7 @@ public void testSubscribeWithAutoAckAndSelector() throws Exception { - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); @@ -365,7 +365,7 @@ public void testSubscribeWithClientAck() throws Exception { - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); @@ -389,7 +389,7 @@ public void testUnsubscribe() throws Exception { - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("CONNECTED")); @@ -426,7 +426,7 @@ public void testTransactionCommit() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); String f = stompConnection.receiveFrame(); @@ -450,7 +450,7 @@ public void testTransactionRollback() throws Exception { MessageConsumer consumer = session.createConsumer(queue); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); String f = stompConnection.receiveFrame(); @@ -486,7 +486,7 @@ public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception { assertClients(1); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); @@ -502,7 +502,61 @@ assertClients(1); } + + public void testConnectNotAuthenticatedWrongUser() throws Exception { + String frame = "CONNECT\n" + "login: dejanb\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String f = stompConnection.receiveFrame(); + + assertTrue(f.startsWith("ERROR")); + assertClients(1); + + } + + public void testConnectNotAuthenticatedWrongPassword() throws Exception { + + String frame = "CONNECT\n" + "login: system\n" + "passcode: dejanb\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + String f = stompConnection.receiveFrame(); + + assertTrue(f.startsWith("ERROR")); + assertClients(1); + } + + public void testSendNotAuthorized() throws Exception { + + String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SEND\n" + "destination:/queue/USERS." + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(frame); + String f = stompConnection.receiveFrame(); + assertTrue(f.startsWith("ERROR")); + + } + + public void testSubscribeNotAuthorized() throws Exception { + + String frame = "CONNECT\n" + "login: guest\n" + "passcode: password\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; + + stompConnection.sendFrame(frame); + String f = stompConnection.receiveFrame(); + assertTrue(f.startsWith("ERROR")); + } + protected void assertClients(int expected) throws Exception { org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients(); int actual = clients.length;