Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 8970 invoked from network); 15 Jun 2009 18:33:02 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 15 Jun 2009 18:33:02 -0000 Received: (qmail 92124 invoked by uid 500); 15 Jun 2009 18:33:13 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 92085 invoked by uid 500); 15 Jun 2009 18:33:13 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 92076 invoked by uid 99); 15 Jun 2009 18:33:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Jun 2009 18:33:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Jun 2009 18:33:10 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C96E3238885B; Mon, 15 Jun 2009 18:32:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r784902 - in /activemq/sandbox/activemq-flow: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/ activemq-broker/src/test/java/org/apache/activemq/apollo/t... Date: Mon, 15 Jun 2009 18:32:48 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090615183248.C96E3238885B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Mon Jun 15 18:32:48 2009 New Revision: 784902 URL: http://svn.apache.org/viewvc?rev=784902&view=rev Log: Adding a couple of more test cases to the VMTransport. Enhanced the Pipe transport so the an EOFEception is raised on the peer when the transport is closed. Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java?rev=784902&r1=784901&r2=784902&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java (original) +++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java Mon Jun 15 18:32:48 2009 @@ -21,16 +21,20 @@ import org.apache.activemq.apollo.Connection; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.wireformat.WireFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; public class BrokerConnection extends Connection { + private static final Log LOG = LogFactory.getLog(BrokerConnection.class); + protected Broker broker; private ProtocolHandler protocolHandler; public BrokerConnection() { setExceptionListener(new ExceptionListener(){ public void exceptionThrown(Exception error) { - error.printStackTrace(); + LOG.debug("Transport failed before messaging protocol was initialized.", error); try { stop(); } catch (Exception ignore) { Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java?rev=784902&r1=784901&r2=784902&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java (original) +++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java Mon Jun 15 18:32:48 2009 @@ -77,6 +77,7 @@ private void stopBroker() { try { this.broker.stop(); + unbind(this); } catch (Exception e) { LOG.error("Failed to stop the broker gracefully: "+e); LOG.debug("Failed to stop the broker gracefully: ", e); Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java?rev=784902&r1=784901&r2=784902&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (original) +++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java Mon Jun 15 18:32:48 2009 @@ -16,6 +16,7 @@ */ package org.apache.activemq.apollo.transport.vm; +import java.io.IOException; import java.net.URI; import junit.framework.TestCase; @@ -33,12 +34,27 @@ System.setProperty("org.apache.activemq.default.directory.prefix", "target/test-data/"); } - public void testAutoCreateBroker() throws Exception { - Transport connect = TransportFactory.compositeConnect(new URI("vm://test")); assertNotNull(connect); - + connect.stop(); + System.out.println("done"); + } + + public void testNoAutoCreateBroker() throws Exception { + try { + TransportFactory.compositeConnect(new URI("vm://test?create=false")); + fail("Expected a IOException"); + } catch (IOException e) { + } + } + + public void testBadOptions() throws Exception { + try { + TransportFactory.compositeConnect(new URI("vm://test?crazy-option=false")); + fail("Expected a IllegalArgumentException"); + } catch (IllegalArgumentException e) { + } } } Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=784902&r1=784901&r2=784902&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original) +++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Mon Jun 15 18:32:48 2009 @@ -1,5 +1,6 @@ package org.apache.activemq.transport.pipe; +import java.io.EOFException; import java.io.IOException; import java.io.InterruptedIOException; import java.net.InetSocketAddress; @@ -30,6 +31,7 @@ import org.apache.activemq.wireformat.WireFormatFactory; public class PipeTransportFactory extends TransportFactory { + static private final Object EOF_TOKEN = new Object(); protected final HashMap servers = new HashMap(); @@ -59,11 +61,14 @@ } public void stop() throws Exception { + pipe.write(EOF_TOKEN); if (readContext != null) { readContext.close(true); } else { stopping.set(true); - thread.join(); + if( thread!=null ) { + thread.join(); + } } } @@ -107,6 +112,9 @@ pipe.setReadReadyListener(this); return true; } else { + if(o == EOF_TOKEN) { + throw new EOFException(); + } if (wireFormat != null) { listener.onCommand(wireFormat.unmarshal((ByteSequence) o)); } else { @@ -126,9 +134,15 @@ while (!stopping.get()) { Object value = pipe.poll(500, TimeUnit.MILLISECONDS); if (value != null) { - listener.onCommand(value); + if(value == EOF_TOKEN) { + throw new EOFException(); + } else { + listener.onCommand(value); + } } } + } catch (IOException e) { + listener.onException(e); } catch (InterruptedException e) { } } @@ -302,7 +316,7 @@ return new PipeTransportServer(); } - private synchronized void unbind(PipeTransportServer server) { + protected synchronized void unbind(PipeTransportServer server) { servers.remove(server.getName()); }