Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 38148 invoked from network); 27 Jan 2009 20:53:21 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 27 Jan 2009 20:53:21 -0000 Received: (qmail 44539 invoked by uid 500); 27 Jan 2009 13:52:00 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 44512 invoked by uid 500); 27 Jan 2009 13:52:00 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 44503 invoked by uid 99); 27 Jan 2009 13:52:00 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jan 2009 05:52:00 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jan 2009 13:51:58 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 919532388875; Tue, 27 Jan 2009 13:51:38 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r738092 - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/network/ test/java/org/apache/activeblaze/ test/java/org/apache/activeblaze/clu... Date: Tue, 27 Jan 2009 13:51:38 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090127135138.919532388875@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Tue Jan 27 13:51:37 2009 New Revision: 738092 URL: http://svn.apache.org/viewvc?rev=738092&view=rev Log: Moved reliability choice down to the network layer Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=738092&r1=738091&r2=738092&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Tue Jan 27 13:51:37 2009 @@ -100,13 +100,13 @@ if (managementURIStr != null && managementURIStr.length() > 0) { managementURI = new URI(getConfiguration().getManagementURI()); } - Network transport = NetworkFactory.get(broadcastURI, managementURI); + Network transport = NetworkFactory.get(broadcastURI, managementURI, getConfiguration().getReliableBroadcast()); transport.setName(getId()); - this.broadcast = configureProcess(transport, getConfiguration().getReliableBroadcast()); + this.broadcast = configureProcess(transport); this.broadcast.init(); } - protected final Processor configureProcess(ChainedProcessor transport, String reliability) throws Exception { + protected final Processor configureProcess(ChainedProcessor transport) throws Exception { int maxPacketSize = getConfiguration().getMaxPacketSize(); CompressionProcessor result = new CompressionProcessor(); result.setPrev(this); @@ -115,8 +115,6 @@ FragmentationProcessor fp = new FragmentationProcessor(); fp.setMaxPacketSize(maxPacketSize); result.setEnd(fp); - ChainedProcessor reliable = getReliability(reliability); - result.setEnd(reliable); result.setEnd(transport); return result; } Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=738092&r1=738091&r2=738092&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Tue Jan 27 13:51:37 2009 @@ -30,7 +30,9 @@ import org.apache.activeblaze.Processor; import org.apache.activeblaze.impl.destination.DestinationMatch; import org.apache.activeblaze.impl.processor.ChainedProcessor; +import org.apache.activeblaze.impl.processor.CompressionProcessor; import org.apache.activeblaze.impl.processor.DefaultChainedProcessor; +import org.apache.activeblaze.impl.processor.FragmentationProcessor; import org.apache.activeblaze.impl.processor.Packet; import org.apache.activeblaze.impl.reliable.ReliableFactory; import org.apache.activeblaze.impl.transport.BaseTransport; @@ -99,6 +101,21 @@ this.local = createLocal(unicastURI); this.group = createGroup(); } + + protected final Processor configureProcess(ChainedProcessor transport, String reliability) throws Exception { + int maxPacketSize = getConfiguration().getMaxPacketSize(); + CompressionProcessor result = new CompressionProcessor(); + result.setPrev(this); + result.setExceptionListener(this); + result.setMaxPacketSize(maxPacketSize); + FragmentationProcessor fp = new FragmentationProcessor(); + fp.setMaxPacketSize(maxPacketSize); + result.setEnd(fp); + ChainedProcessor reliable = getReliability(reliability); + result.setEnd(reliable); + result.setEnd(transport); + return result; + } protected ChainedProcessor getReliability(String reliability) throws Exception { DefaultChainedProcessor reliable = ReliableFactory.get(reliability); Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java?rev=738092&r1=738091&r2=738092&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java Tue Jan 27 13:51:37 2009 @@ -20,8 +20,10 @@ import java.net.URI; import org.apache.activeblaze.ExceptionListener; import org.apache.activeblaze.Processor; +import org.apache.activeblaze.impl.processor.ChainedProcessor; import org.apache.activeblaze.impl.processor.DefaultChainedProcessor; import org.apache.activeblaze.impl.processor.Packet; +import org.apache.activeblaze.impl.reliable.ReliableFactory; import org.apache.activeblaze.impl.transport.BaseTransport; import org.apache.activeblaze.impl.transport.TransportFactory; @@ -32,11 +34,12 @@ public class MulticastNetwork extends DefaultChainedProcessor implements Network, ExceptionListener { private URI uri; private URI managementURI; - private BaseTransport broadcast; - private BaseTransport management; + private ChainedProcessor broadcast; + private ChainedProcessor management; private String name = ""; private InetSocketAddress broadcastAddress; private InetSocketAddress managementAddress; + private String reliability = "simple"; /** * @return the name @@ -70,25 +73,46 @@ } /** - * @return true if initialized + * @return the reliable protocol used + * @see org.apache.activeblaze.impl.network.Network#getReliability() + */ + public String getReliability() { + return this.reliability; + } + + /** + * @param reliability + * @see org.apache.activeblaze.impl.network.Network#setReliability(java.lang.String) + */ + public void setReliability(String reliability) { + this.reliability = reliability; + } + + /** + * initialize the network + * * @throws Exception * @see org.apache.activeblaze.Service#init() */ public void doInit() throws Exception { super.doInit(); - this.broadcast = TransportFactory.get(this.uri); - this.broadcast.setName(getName() + "-Broadcast"); - this.broadcast.setExceptionListener(this); + this.broadcast = ReliableFactory.get(getReliability()); + BaseTransport transport = TransportFactory.get(this.uri); + transport.setName(getName() + "-Broadcast"); + transport.setExceptionListener(this); this.broadcast.setPrev(getPrev()); - this.broadcast.setNext(getNext()); + this.broadcast.setNext(transport); + transport.setPrev(this.broadcast); this.broadcastAddress = new InetSocketAddress(this.uri.getHost(), this.uri.getPort()); this.broadcast.init(); if (this.managementURI != null && !this.managementURI.equals(this.uri)) { - this.management = TransportFactory.get(this.managementURI); - this.management.setName(getName() + "-Management"); - this.management.setExceptionListener(this); + this.management = ReliableFactory.get(getReliability()); + BaseTransport managementTransport = TransportFactory.get(this.managementURI); + managementTransport.setName(getName() + "-Management"); + managementTransport.setExceptionListener(this); this.management.setPrev(getPrev()); - this.management.setNext(getNext()); + this.management.setNext(managementTransport); + managementTransport.setPrev(this.management); this.managementAddress = new InetSocketAddress(this.managementURI.getHost(), this.managementURI.getPort()); this.management.init(); } Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java?rev=738092&r1=738091&r2=738092&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java Tue Jan 27 13:51:37 2009 @@ -17,7 +17,6 @@ package org.apache.activeblaze.impl.network; import java.net.URI; import org.apache.activeblaze.impl.processor.ChainedProcessor; -import org.apache.activeblaze.impl.processor.Packet; /** *

@@ -50,6 +49,17 @@ */ public void setManagementURI(URI uri) throws Exception; + /** + * Set the reliable protocol to use for this network + * @param reliability + */ + public void setReliability(String reliability); + + /** + * @return the reliability protocol used for this network + */ + public String getReliability(); + Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java?rev=738092&r1=738091&r2=738092&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java Tue Jan 27 13:51:37 2009 @@ -35,7 +35,7 @@ * @return the network associated with the URI * @throws Exception */ - public static Network get(URI broadcast, URI management) throws Exception { + public static Network get(URI broadcast, URI management,String reliability) throws Exception { Network result = findNetwork(broadcast); result.setURI(broadcast); result.setManagementURI(management); Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java?rev=738092&r1=738091&r2=738092&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java (original) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java Tue Jan 27 13:51:37 2009 @@ -24,8 +24,10 @@ import org.apache.activeblaze.BlazeException; import org.apache.activeblaze.ExceptionListener; import org.apache.activeblaze.Processor; +import org.apache.activeblaze.impl.processor.ChainedProcessor; import org.apache.activeblaze.impl.processor.DefaultChainedProcessor; import org.apache.activeblaze.impl.processor.Packet; +import org.apache.activeblaze.impl.reliable.ReliableFactory; import org.apache.activeblaze.impl.transport.BaseTransport; import org.apache.activeblaze.impl.transport.TransportFactory; import org.apache.activeblaze.util.URISupport; @@ -43,10 +45,11 @@ private List managementURIs = new ArrayList(); private List broadcastAddresses = new ArrayList(); private List managementAddresses = new ArrayList(); - private BaseTransport broadcast; - private BaseTransport management; + private ChainedProcessor broadcast; + private ChainedProcessor management; private String name = ""; private boolean useFirstFreeAddress; + private String reliability = "swp"; /** * @return the name @@ -64,6 +67,22 @@ } /** + * @return the reliability protocol used + * @see org.apache.activeblaze.impl.network.Network#getReliability() + */ + public String getReliability() { + return this.reliability; + } + + /** + * @param reliability + * @see org.apache.activeblaze.impl.network.Network#setReliability(java.lang.String) + */ + public void setReliability(String reliability) { + this.reliability = reliability; + } + + /** * @param location * @throws Exception * @see org.apache.activeblaze.impl.network.Network#setManagementURI(java.net.URI) @@ -110,19 +129,25 @@ */ public void doInit() throws Exception { super.doInit(); - this.broadcast = createTransport(this.broadcastURIs); - this.broadcast.setName(getName() + "-Broadcast"); - this.broadcast.setExceptionListener(this); + this.broadcast = ReliableFactory.get(getReliability()); + BaseTransport transport = createTransport(this.broadcastURIs); + transport.setName(getName() + "-Broadcast"); + transport.setExceptionListener(this); this.broadcast.setPrev(getPrev()); - this.broadcast.setNext(getNext()); + this.broadcast.setNext(transport); + transport.setPrev(this.broadcast); + this.broadcast.init(); if (!this.managementURIs.isEmpty() && !this.managementURIs.equals(this.broadcastURIs)) { - this.management = createTransport(this.managementURIs); - this.management.setName(getName() + "-Management"); - this.management.setExceptionListener(this); + this.management = ReliableFactory.get(getReliability()); + BaseTransport managementTransport =createTransport(this.managementURIs); + managementTransport.setName(getName() + "-Management"); + managementTransport.setExceptionListener(this); this.management.setPrev(getPrev()); - this.management.setNext(getNext()); + this.management.setNext(managementTransport); + managementTransport.setPrev(this.management); this.management.init(); } + } /** Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java?rev=738092&r1=738091&r2=738092&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java (original) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java Tue Jan 27 13:51:37 2009 @@ -31,5 +31,6 @@ uri += ")"; fac.getConfiguration().setManagementURI(""); fac.getConfiguration().setBroadcastURI(uri); + fac.getConfiguration().setReliableBroadcast("swp"); } } Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java?rev=738092&r1=738091&r2=738092&view=diff ============================================================================== --- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java (original) +++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java Tue Jan 27 13:51:37 2009 @@ -30,5 +30,6 @@ uri += ")"; fac.getConfiguration().setManagementURI(""); fac.getConfiguration().setBroadcastURI(uri); + fac.getConfiguration().setReliableBroadcast("swp"); } }