activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r738092 - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/network/ test/java/org/apache/activeblaze/ test/java/org/apache/activeblaze/clu...
Date Tue, 27 Jan 2009 13:51:38 GMT
Author: rajdavies
Date: Tue Jan 27 13:51:37 2009
New Revision: 738092

URL: http://svn.apache.org/viewvc?rev=738092&view=rev
Log:
Moved reliability choice down to the network layer

Modified:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=738092&r1=738091&r2=738092&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
Tue Jan 27 13:51:37 2009
@@ -100,13 +100,13 @@
         if (managementURIStr != null && managementURIStr.length() > 0) {
             managementURI = new URI(getConfiguration().getManagementURI());
         }
-        Network transport = NetworkFactory.get(broadcastURI, managementURI);
+        Network transport = NetworkFactory.get(broadcastURI, managementURI, getConfiguration().getReliableBroadcast());
         transport.setName(getId());
-        this.broadcast = configureProcess(transport, getConfiguration().getReliableBroadcast());
+        this.broadcast = configureProcess(transport);
         this.broadcast.init();
     }
 
-    protected final Processor configureProcess(ChainedProcessor transport, String reliability)
throws Exception {
+    protected final Processor configureProcess(ChainedProcessor transport) throws Exception
{
         int maxPacketSize = getConfiguration().getMaxPacketSize();
         CompressionProcessor result = new CompressionProcessor();
         result.setPrev(this);
@@ -115,8 +115,6 @@
         FragmentationProcessor fp = new FragmentationProcessor();
         fp.setMaxPacketSize(maxPacketSize);
         result.setEnd(fp);
-        ChainedProcessor reliable = getReliability(reliability);
-        result.setEnd(reliable);
         result.setEnd(transport);
         return result;
     }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=738092&r1=738091&r2=738092&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
Tue Jan 27 13:51:37 2009
@@ -30,7 +30,9 @@
 import org.apache.activeblaze.Processor;
 import org.apache.activeblaze.impl.destination.DestinationMatch;
 import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.CompressionProcessor;
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
+import org.apache.activeblaze.impl.processor.FragmentationProcessor;
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.reliable.ReliableFactory;
 import org.apache.activeblaze.impl.transport.BaseTransport;
@@ -99,6 +101,21 @@
         this.local = createLocal(unicastURI);
         this.group = createGroup();
     }
+    
+    protected final Processor configureProcess(ChainedProcessor transport, String reliability)
throws Exception {
+        int maxPacketSize = getConfiguration().getMaxPacketSize();
+        CompressionProcessor result = new CompressionProcessor();
+        result.setPrev(this);
+        result.setExceptionListener(this);
+        result.setMaxPacketSize(maxPacketSize);
+        FragmentationProcessor fp = new FragmentationProcessor();
+        fp.setMaxPacketSize(maxPacketSize);
+        result.setEnd(fp);
+        ChainedProcessor reliable = getReliability(reliability);
+        result.setEnd(reliable);
+        result.setEnd(transport);
+        return result;
+    }
 
     protected ChainedProcessor getReliability(String reliability) throws Exception {
         DefaultChainedProcessor reliable = ReliableFactory.get(reliability);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java?rev=738092&r1=738091&r2=738092&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
Tue Jan 27 13:51:37 2009
@@ -20,8 +20,10 @@
 import java.net.URI;
 import org.apache.activeblaze.ExceptionListener;
 import org.apache.activeblaze.Processor;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.reliable.ReliableFactory;
 import org.apache.activeblaze.impl.transport.BaseTransport;
 import org.apache.activeblaze.impl.transport.TransportFactory;
 
@@ -32,11 +34,12 @@
 public class MulticastNetwork extends DefaultChainedProcessor implements Network, ExceptionListener
{
     private URI uri;
     private URI managementURI;
-    private BaseTransport broadcast;
-    private BaseTransport management;
+    private ChainedProcessor broadcast;
+    private ChainedProcessor management;
     private String name = "";
     private InetSocketAddress broadcastAddress;
     private InetSocketAddress managementAddress;
+    private String reliability = "simple";
 
     /**
      * @return the name
@@ -70,25 +73,46 @@
     }
 
     /**
-     * @return true if initialized
+     * @return the reliable protocol used
+     * @see org.apache.activeblaze.impl.network.Network#getReliability()
+     */
+    public String getReliability() {
+        return this.reliability;
+    }
+
+    /**
+     * @param reliability
+     * @see org.apache.activeblaze.impl.network.Network#setReliability(java.lang.String)
+     */
+    public void setReliability(String reliability) {
+        this.reliability = reliability;
+    }
+
+    /**
+     * initialize the network
+     * 
      * @throws Exception
      * @see org.apache.activeblaze.Service#init()
      */
     public void doInit() throws Exception {
         super.doInit();
-        this.broadcast = TransportFactory.get(this.uri);
-        this.broadcast.setName(getName() + "-Broadcast");
-        this.broadcast.setExceptionListener(this);
+        this.broadcast = ReliableFactory.get(getReliability());
+        BaseTransport transport = TransportFactory.get(this.uri);
+        transport.setName(getName() + "-Broadcast");
+        transport.setExceptionListener(this);
         this.broadcast.setPrev(getPrev());
-        this.broadcast.setNext(getNext());
+        this.broadcast.setNext(transport);
+        transport.setPrev(this.broadcast);
         this.broadcastAddress = new InetSocketAddress(this.uri.getHost(), this.uri.getPort());
         this.broadcast.init();
         if (this.managementURI != null && !this.managementURI.equals(this.uri)) {
-            this.management = TransportFactory.get(this.managementURI);
-            this.management.setName(getName() + "-Management");
-            this.management.setExceptionListener(this);
+            this.management = ReliableFactory.get(getReliability());
+            BaseTransport managementTransport = TransportFactory.get(this.managementURI);
+            managementTransport.setName(getName() + "-Management");
+            managementTransport.setExceptionListener(this);
             this.management.setPrev(getPrev());
-            this.management.setNext(getNext());
+            this.management.setNext(managementTransport);
+            managementTransport.setPrev(this.management);
             this.managementAddress = new InetSocketAddress(this.managementURI.getHost(),
this.managementURI.getPort());
             this.management.init();
         }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java?rev=738092&r1=738091&r2=738092&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
Tue Jan 27 13:51:37 2009
@@ -17,7 +17,6 @@
 package org.apache.activeblaze.impl.network;
 import java.net.URI;
 import org.apache.activeblaze.impl.processor.ChainedProcessor;
-import org.apache.activeblaze.impl.processor.Packet;
 
 /**
  * <P>
@@ -50,6 +49,17 @@
      */
     public void setManagementURI(URI uri) throws Exception;
     
+    /**
+     * Set the reliable protocol to use for this network
+     * @param reliability
+     */
+    public void setReliability(String reliability);
+    
+    /**
+     * @return the reliability protocol used for this network
+     */
+    public String getReliability();
+    
         
    
     

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java?rev=738092&r1=738091&r2=738092&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
Tue Jan 27 13:51:37 2009
@@ -35,7 +35,7 @@
      * @return the network associated with the URI
      * @throws Exception
      */
-    public static Network get(URI broadcast, URI management) throws Exception {
+    public static Network get(URI broadcast, URI management,String reliability) throws Exception
{
         Network result = findNetwork(broadcast);
         result.setURI(broadcast);
         result.setManagementURI(management);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java?rev=738092&r1=738091&r2=738092&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
Tue Jan 27 13:51:37 2009
@@ -24,8 +24,10 @@
 import org.apache.activeblaze.BlazeException;
 import org.apache.activeblaze.ExceptionListener;
 import org.apache.activeblaze.Processor;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.reliable.ReliableFactory;
 import org.apache.activeblaze.impl.transport.BaseTransport;
 import org.apache.activeblaze.impl.transport.TransportFactory;
 import org.apache.activeblaze.util.URISupport;
@@ -43,10 +45,11 @@
     private List<URI> managementURIs = new ArrayList<URI>();
     private List<InetSocketAddress> broadcastAddresses = new ArrayList<InetSocketAddress>();
     private List<InetSocketAddress> managementAddresses = new ArrayList<InetSocketAddress>();
-    private BaseTransport broadcast;
-    private BaseTransport management;
+    private ChainedProcessor broadcast;
+    private ChainedProcessor management;
     private String name = "";
     private boolean useFirstFreeAddress;
+    private String reliability = "swp";
 
     /**
      * @return the name
@@ -64,6 +67,22 @@
     }
 
     /**
+     * @return the reliability protocol used
+     * @see org.apache.activeblaze.impl.network.Network#getReliability()
+     */
+    public String getReliability() {
+        return this.reliability;
+    }
+
+    /**
+     * @param reliability
+     * @see org.apache.activeblaze.impl.network.Network#setReliability(java.lang.String)
+     */
+    public void setReliability(String reliability) {
+        this.reliability = reliability;
+    }
+
+    /**
      * @param location
      * @throws Exception
      * @see org.apache.activeblaze.impl.network.Network#setManagementURI(java.net.URI)
@@ -110,19 +129,25 @@
      */
     public void doInit() throws Exception {
         super.doInit();
-        this.broadcast = createTransport(this.broadcastURIs);
-        this.broadcast.setName(getName() + "-Broadcast");
-        this.broadcast.setExceptionListener(this);
+        this.broadcast = ReliableFactory.get(getReliability());
+        BaseTransport transport = createTransport(this.broadcastURIs);
+        transport.setName(getName() + "-Broadcast");
+        transport.setExceptionListener(this);
         this.broadcast.setPrev(getPrev());
-        this.broadcast.setNext(getNext());
+        this.broadcast.setNext(transport);
+        transport.setPrev(this.broadcast);
+        this.broadcast.init();
         if (!this.managementURIs.isEmpty() && !this.managementURIs.equals(this.broadcastURIs))
{
-            this.management = createTransport(this.managementURIs);
-            this.management.setName(getName() + "-Management");
-            this.management.setExceptionListener(this);
+            this.management = ReliableFactory.get(getReliability());
+                BaseTransport managementTransport =createTransport(this.managementURIs);
+                managementTransport.setName(getName() + "-Management");
+                managementTransport.setExceptionListener(this);
             this.management.setPrev(getPrev());
-            this.management.setNext(getNext());
+            this.management.setNext(managementTransport);
+            managementTransport.setPrev(this.management);
             this.management.init();
         }
+        
     }
 
     /**

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java?rev=738092&r1=738091&r2=738092&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java
(original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java
Tue Jan 27 13:51:37 2009
@@ -31,5 +31,6 @@
         uri += ")";
         fac.getConfiguration().setManagementURI("");
         fac.getConfiguration().setBroadcastURI(uri);
+        fac.getConfiguration().setReliableBroadcast("swp");
     }
 }

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java?rev=738092&r1=738091&r2=738092&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java
(original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java
Tue Jan 27 13:51:37 2009
@@ -30,5 +30,6 @@
         uri += ")";
         fac.getConfiguration().setManagementURI("");
         fac.getConfiguration().setBroadcastURI(uri);
+        fac.getConfiguration().setReliableBroadcast("swp");
     }
 }



Mime
View raw message