Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 20320 invoked from network); 18 May 2010 15:45:33 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 18 May 2010 15:45:33 -0000 Received: (qmail 7177 invoked by uid 500); 18 May 2010 15:45:33 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 7137 invoked by uid 500); 18 May 2010 15:45:33 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 7130 invoked by uid 99); 18 May 2010 15:45:33 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 May 2010 15:45:33 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 May 2010 15:45:29 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 83305238890A; Tue, 18 May 2010 15:45:07 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r945709 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/tcp/TimeStampStream.java test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java Date: Tue, 18 May 2010 15:45:07 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100518154507.83305238890A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Tue May 18 15:45:07 2010 New Revision: 945709 URL: http://svn.apache.org/viewvc?rev=945709&view=rev Log: resolve https://issues.apache.org/activemq/browse/AMQ-2737 - add the two new files, oops Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TimeStampStream.java (with props) activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java (with props) Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TimeStampStream.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TimeStampStream.java?rev=945709&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TimeStampStream.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TimeStampStream.java Tue May 18 15:45:07 2010 @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.tcp; + +public interface TimeStampStream { + + public abstract boolean isWriting(); + + public abstract long getWriteTimestamp(); + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TimeStampStream.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TimeStampStream.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java?rev=945709&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java Tue May 18 15:45:07 2010 @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport; + +import java.net.Socket; +import java.net.SocketException; +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import junit.framework.Test; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.JmsTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompConnection; +import org.apache.activemq.util.SocketProxy; +import org.apache.activemq.util.URISupport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class SoWriteTimeoutTest extends JmsTestSupport { + private static final Log LOG = LogFactory.getLog(SoWriteTimeoutTest.class); + + final int receiveBufferSize = 16*1024; + public String brokerTransportScheme = "nio"; + + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + broker.addConnector(brokerTransportScheme + "://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize="+ receiveBufferSize); + if ("nio".equals(brokerTransportScheme)) { + broker.addConnector("stomp+" + brokerTransportScheme + "://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize=" + receiveBufferSize + "&trace=true"); + } + return broker; + } + + public void initCombosForTestWriteTimeout() { + addCombinationValues("brokerTransportScheme", new Object[]{"tcp", "nio"}); + } + + public void testWriteTimeout() throws Exception { + + Destination dest = new ActiveMQQueue("testWriteTimeout"); + messageTextPrefix = initMessagePrefix(8*1024); + sendMessages(dest, 500); + + URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri()); + LOG.info("consuming using uri: " + tcpBrokerUri); + + SocketProxy proxy = new SocketProxy(); + proxy.setTarget(tcpBrokerUri); + proxy.setReceiveBufferSize(receiveBufferSize); + proxy.open(); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(proxy.getUrl()); + Connection c = factory.createConnection(); + c.start(); + Session session = c.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(dest); + proxy.pause(); + // writes should back up... writeTimeout will kick in a abort the connection + TimeUnit.SECONDS.sleep(4); + proxy.goOn(); + assertNotNull("can receive buffered messages", consumer.receive(500)); + try { + session.commit(); + fail("expect commit to fail as server has aborted writeTimeout connection"); + } catch (JMSException expected) { + } + } + + public void testWriteTimeoutStompNio() throws Exception { + ActiveMQQueue dest = new ActiveMQQueue("testWriteTimeout"); + messageTextPrefix = initMessagePrefix(8*1024); + sendMessages(dest, 500); + + URI stompBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri()); + LOG.info("consuming using uri: " + stompBrokerUri); + + SocketProxy proxy = new SocketProxy(); + proxy.setTarget(new URI("tcp://localhost:" + stompBrokerUri.getPort())); + proxy.setReceiveBufferSize(receiveBufferSize); + proxy.open(); + + StompConnection stompConnection = new StompConnection(); + stompConnection.open(new Socket("localhost", proxy.getUrl().getPort())); + + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + dest.getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + // ensure dispatch has started before pause + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + + proxy.pause(); + + // writes should back up... writeTimeout will kick in a abort the connection + TimeUnit.SECONDS.sleep(1); + + // see the blocked threads + //dumpAllThreads("blocked on write"); + + // abort should be done after this + TimeUnit.SECONDS.sleep(4); + + proxy.goOn(); + + // get a buffered message + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("MESSAGE")); + + // verify connection is dead + try { + for (int i=0; i<10; i++) { + stompConnection.send("/queue/" + dest.getPhysicalName(), messageTextPrefix + "ReplyText" + i); + } + fail("expected send to fail with timeout out connection"); + } catch (SocketException expected) { + LOG.info("got exception on send after timeout: " + expected); + } + } + + private String initMessagePrefix(int i) { + byte[] content = new byte[i]; + return new String(content); + } + + public static Test suite() { + return suite(SoWriteTimeoutTest.class); + } +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date