activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r734856 - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/network/ main/java/org/apache/activeblaze/impl/processor/ main/java/org/apache/...
Date Thu, 15 Jan 2009 23:48:02 GMT
Author: rajdavies
Date: Thu Jan 15 15:48:02 2009
New Revision: 734856

URL: http://svn.apache.org/viewvc?rev=734856&view=rev
Log:
Use a Network instead of transport for broadcast

Modified:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
Thu Jan 15 15:48:02 2009
@@ -22,13 +22,14 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activeblaze.impl.destination.DestinationMatch;
-import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
+import org.apache.activeblaze.impl.network.Network;
+import org.apache.activeblaze.impl.network.NetworkFactory;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
 import org.apache.activeblaze.impl.processor.CompressionProcessor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.impl.processor.FragmentationProcessor;
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.reliable.ReliableFactory;
-import org.apache.activeblaze.impl.transport.BaseTransport;
-import org.apache.activeblaze.impl.transport.TransportFactory;
 import org.apache.activeblaze.util.IdGenerator;
 import org.apache.activeblaze.util.PropertyUtil;
 import org.apache.activeblaze.wire.BlazeData;
@@ -50,10 +51,10 @@
     protected Buffer producerId;
     protected AtomicLong sequence = new AtomicLong();
     protected AtomicLong session = new AtomicLong(1);
-    private Processor broadcast;
+    protected Processor broadcast;
+    protected Buffer managementURIBuffer;
     protected BlazeConfiguration configuration = new BlazeConfiguration();
     private String id;
-    private Buffer managementURI;
     private InetSocketAddress toAddress;
 
     /**
@@ -85,11 +86,10 @@
 
     /**
      * @param destination
-     * @param l
-     * @return
+     * 
+     * @return the TopicListener
      * @throws Exception
-     * @see org.apache.activeblaze.BlazeChannel#removeBlazeMessageListener(java.lang.String,
-     *      org.apache.activeblaze.BlazeTopicListener)
+     * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(String destination)
      */
     public BlazeTopicListener removeBlazeTopicMessageListener(String destination) throws
Exception {
         Buffer key = new Buffer(destination);
@@ -102,26 +102,28 @@
             String broadcastURIStr = getConfiguration().getBroadcastURI();
             broadcastURIStr = PropertyUtil.addPropertiesToURIFromBean(broadcastURIStr, getConfiguration());
             URI broadcastURI = new URI(broadcastURIStr);
+            URI managementURI = null;
+            if (getConfiguration().getManagementURI() != null) {
+                managementURI = new URI(getConfiguration().getManagementURI());
+                this.managementURIBuffer = new Buffer(managementURI.toString());
+            } else {
+                this.managementURIBuffer = new Buffer();
+            }
             this.toAddress = new InetSocketAddress(broadcastURI.getHost(), broadcastURI.getPort());
-            this.managementURI = new Buffer(new URI(getConfiguration().getManagementURI()).toString());
-            BaseTransport transport = TransportFactory.get(broadcastURI);
-            transport.setName(getId() + "-Broadcast");
+            Network transport = NetworkFactory.get(broadcastURI, managementURI);
+            transport.setName(getId());
             this.broadcast = configureProcess(transport);
             this.broadcast.init();
         }
         return result;
     }
 
-    protected final void configureTransport(BaseTransport transport) throws Exception {
-        transport.setMaxPacketSize(getConfiguration().getMaxPacketSize());
-    }
-
-    protected Processor configureProcess(BaseTransport transport) throws Exception {
+    protected final Processor configureProcess(ChainedProcessor transport) throws Exception
{
         int maxPacketSize = getConfiguration().getMaxPacketSize();
-        configureTransport(transport);
         CompressionProcessor result = new CompressionProcessor();
         result.setPrev(this);
         result.setExceptionListener(this);
+        result.setMaxPacketSize(maxPacketSize);
         FragmentationProcessor fp = new FragmentationProcessor();
         fp.setMaxPacketSize(maxPacketSize);
         result.setEnd(fp);
@@ -162,7 +164,7 @@
         blazeData.setDestination(new Buffer(destination));
         PacketData packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
         packetData.setReliable(true);
-        packetData.setFromAddress(this.managementURI);
+        packetData.setFromAddress(this.managementURIBuffer);
         Packet packet = new Packet(packetData);
         packet.setTo(this.toAddress);
         this.broadcast.downStream(packet);
@@ -170,7 +172,7 @@
 
     protected synchronized PacketData getPacketData(MessageType type, Message<?> message)
{
         PacketData packetData = new PacketData();
-        packetData.setFromAddress(this.managementURI);
+        packetData.setFromAddress(this.managementURIBuffer);
         packetData.setType(type.getNumber());
         packetData.setProducerId(this.producerId);
         packetData.setSessionId(this.session.get());

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
Thu Jan 15 15:48:02 2009
@@ -100,7 +100,7 @@
             URI groupManagementURI = new URI(groupManagementURIStr);
             this.toManagementAddress = new InetSocketAddress(groupManagementURI.getHost(),
groupManagementURI.getPort());
             this.groupManagementTransport = TransportFactory.get(groupManagementURI);
-            configureTransport(this.groupManagementTransport);
+            this.groupManagementTransport.setMaxPacketSize(getConfiguration().getMaxPacketSize());
             this.groupManagementTransport.setPrev(this);
             this.groupManagementTransport.setName(getId() + "-HeartbeatTransport");
             this.groupManagementTransport.init();
@@ -615,31 +615,9 @@
      * @param message
      * @throws Exception
      */
-    public void broadcastMessage(MessageType messageType, Message<?> message) throws
Exception {
-        PacketData data = getPacketData(messageType, message);
-        data.setReliable(false);
-        data.setFromAddress(this.inboxURI);
-        Packet packet = new Packet(data);
-        packet.setTo(this.toManagementAddress);
-        this.groupManagementTransport.downStream(packet);
-    }
 
-    /**
-     * broadcast a general message
-     * 
-     * @param asyncRequest
-     * @param messageType
-     * @param message
-     * @throws Exception
-     */
-    public void broadcastMessage(AsyncGroupRequest asyncRequest, MessageType messageType,
Message<?> message)
-            throws Exception {
-        SendRequest request = new SendRequest();
+    public void broadcastMessage(MessageType messageType, Message<?> message) throws
Exception {
         PacketData data = getPacketData(messageType, message);
-        asyncRequest.add(data.getMessageId(), request);
-        synchronized (this.messageRequests) {
-            this.messageRequests.put(data.getMessageId(), request);
-        }
         data.setReliable(false);
         data.setFromAddress(this.inboxURI);
         Packet packet = new Packet(data);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
Thu Jan 15 15:48:02 2009
@@ -18,6 +18,7 @@
 
 import java.net.URI;
 import org.apache.activeblaze.ExceptionListener;
+import org.apache.activeblaze.Processor;
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.transport.BaseTransport;
@@ -80,11 +81,15 @@
             this.broadcast  = TransportFactory.get(this.uri);
             this.broadcast.setName(getName() + "-Broadcast");
             this.broadcast.setExceptionListener(this);
+            this.broadcast.setPrev(getPrev());
+            this.broadcast.setNext(getNext());
             this.broadcast.init();
             if(this.managementURI != null && !this.managementURI.equals(this.uri)){
                 this.management =  TransportFactory.get(this.managementURI);
                 this.management.setName(getName() + "-Management");
                 this.management.setExceptionListener(this);
+                this.management.setPrev(getPrev());
+                this.management.setNext(getNext());
                 this.management.init();
                 
             }
@@ -184,7 +189,35 @@
         }
         
     }
+    
+    public void setNext(Processor next) {
+        super.setNext(next);
+        if (this.management != null) {
+            this.management.setNext(next);
+        }
+        if(this.broadcast!=null){
+            this.broadcast.setNext(next);
+        }
+    }
+    
+    public void setPrev(Processor prev) {
+        super.setPrev(prev);
+        if (this.management != null) {
+            this.management.setPrev(prev);;
+        }
+        if(this.broadcast!=null){
+            this.broadcast.setPrev(prev);
+        }
+    }
 
+    public void setMaxPacketSize(int maxPacketSize) {
+        if (this.management != null) {
+            this.management.setMaxPacketSize(maxPacketSize);
+        }
+        if(this.broadcast!=null){
+            this.broadcast.setMaxPacketSize(maxPacketSize);
+        }
+    }
     
     
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
Thu Jan 15 15:48:02 2009
@@ -16,8 +16,7 @@
  */
 package org.apache.activeblaze.impl.network;
 import java.net.URI;
-import org.apache.activeblaze.Service;
-import org.apache.activeblaze.Processor;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
 import org.apache.activeblaze.impl.processor.Packet;
 
 /**
@@ -26,7 +25,7 @@
  * channel instances
  * 
  */
-public interface Network extends Processor, Service {
+public interface Network extends ChainedProcessor{
     
     /**
      * @return the name

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
Thu Jan 15 15:48:02 2009
@@ -25,17 +25,24 @@
 public class NetworkFactory {
     
     /**
-     * @param location
+     * @param broadcast 
+     * @param management 
      * @return the network associated with the URI
      * @throws Exception
      */
-    public static Network get(URI location) throws Exception {
+    public static Network get(URI broadcast, URI management) throws Exception {
         Network result = null;
-        String scheme = location.getScheme();
+        String scheme = broadcast.getScheme();
         scheme = scheme.trim();
         if (scheme.equalsIgnoreCase("mcast") || scheme.equalsIgnoreCase("multicast")){
             result = new MulticastNetwork();
         }
+        if(result != null) {
+            result.setURI(broadcast);
+            if(management!=null) {
+                result.setManagementURI(management);
+            }
+        }
         return result;
     }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
Thu Jan 15 15:48:02 2009
@@ -17,6 +17,7 @@
 package org.apache.activeblaze.impl.processor;
 
 import org.apache.activeblaze.BaseService;
+import org.apache.activeblaze.BlazeConfiguration;
 import org.apache.activeblaze.BlazeException;
 import org.apache.activeblaze.ExceptionListener;
 import org.apache.activeblaze.Processor;
@@ -24,89 +25,88 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * Chains Processors together
+ * Default implementation of a ChainedProcessor
  * 
  */
-public class DefaultChainedProcessor extends BaseService implements Processor {
+public class DefaultChainedProcessor extends BaseService implements ChainedProcessor {
     private static final Log LOG = LogFactory.getLog(DefaultChainedProcessor.class);
     private Processor next;
     private Processor prev;
     protected ExceptionListener exceptionListener;
+    private int maxPacketSize = BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE;
 
     protected DefaultChainedProcessor() {
     }
 
     /**
-     * @return the next
+     * @return the next Processor
+     * @see org.apache.activeblaze.impl.processor.ChainedProcessor#getNext()
      */
     public Processor getNext() {
         return this.next;
     }
 
     /**
-     * Set Next at the end of the chain
-     * 
      * @param next
-     * 
+     * @see org.apache.activeblaze.impl.processor.ChainedProcessor#setEnd(org.apache.activeblaze.Processor)
      */
     public void setEnd(Processor next) {
-        DefaultChainedProcessor target = this;
+        ChainedProcessor target = this;
         Processor n = getNext();
         while (n != null) {
-            if (n instanceof DefaultChainedProcessor) {
-                DefaultChainedProcessor cn = (DefaultChainedProcessor) n;
+            if (n instanceof ChainedProcessor) {
+                ChainedProcessor cn = (ChainedProcessor) n;
                 target = cn;
                 n = cn.getNext();
             }
         }
-        if (next instanceof DefaultChainedProcessor) {
-            target.setNextChain((DefaultChainedProcessor) next);
+        if (next instanceof ChainedProcessor) {
+            target.setNextChain((ChainedProcessor) next);
         } else {
-            target.next = next;
+            target.setNext(next);
         }
     }
 
     /**
-     * Set the next
-     * 
      * @param next
+     * @see org.apache.activeblaze.impl.processor.ChainedProcessor#setNext(org.apache.activeblaze.Processor)
      */
     public void setNext(Processor next) {
         this.next = next;
     }
 
     /**
-     * @return the prev
+     * @return previous processor
+     * @see org.apache.activeblaze.impl.processor.ChainedProcessor#getPrev()
      */
     public Processor getPrev() {
         return this.prev;
     }
 
     /**
-     * Set the next chain
-     * 
      * @param p
+     * @see org.apache.activeblaze.impl.processor.ChainedProcessor#setNextChain(org.apache.activeblaze.impl.processor.ChainedProcessor)
      */
-    public void setNextChain(DefaultChainedProcessor p) {
-        DefaultChainedProcessor target = this;
+    public void setNextChain(ChainedProcessor p) {
+        ChainedProcessor target = this;
         Processor n = getNext();
         while (n != null) {
-            if (n instanceof DefaultChainedProcessor) {
-                DefaultChainedProcessor cn = (DefaultChainedProcessor) n;
+            if (n instanceof ChainedProcessor) {
+                ChainedProcessor cn = (ChainedProcessor) n;
                 target = cn;
                 n = cn.getNext();
             }
         }
-        target.next = p;
+        target.setNext(p);
         p.setPrev(target);
-        if (this.exceptionListener != null && p.exceptionListener == null) {
-            p.exceptionListener = this.exceptionListener;
+        if (this.exceptionListener != null && p.getExceptionListener() == null) {
+            p.setExceptionListener(this.exceptionListener);
         }
     }
 
     /**
      * @param prev
-     *            the prev to set
+     * @see org.apache.activeblaze.impl.processor.ChainedProcessor#setPrev(org.apache.activeblaze.Processor)
      */
     public void setPrev(Processor prev) {
         this.prev = prev;
@@ -191,4 +191,26 @@
             LOG.error("Caught an exception stopping", e);
         }
     }
+
+    /**
+     * @return the exceptionListener
+     * @see org.apache.activeblaze.impl.processor.ChainedProcessor#getExceptionListener()
+     */
+    public ExceptionListener getExceptionListener() {
+       return this.exceptionListener;
+    }
+    
+    /**
+     * @return the maxPacketSize
+     */
+    public int getMaxPacketSize() {
+        return this.maxPacketSize;
+    }
+
+    /**
+     * @param maxPacketSize the maxPacketSize to set
+     */
+    public void setMaxPacketSize(int maxPacketSize) {
+        this.maxPacketSize = maxPacketSize;
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
Thu Jan 15 15:48:02 2009
@@ -20,7 +20,6 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.activeblaze.BlazeConfiguration;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,7 +31,6 @@
 @SuppressWarnings("serial")
 public class FragmentationProcessor extends DefaultChainedProcessor {
     private static final Log LOG = LogFactory.getLog(FragmentationProcessor.class);
-    private int maxPacketSize = BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE;
     private int maxCacheSize = 16 * 1024;
     private Map<String,List<Packet>> cache = new LinkedHashMap<String,List<Packet>>()
{
         protected boolean removeEldestEntry(Map.Entry<String,List<Packet>> eldest)
{
@@ -110,19 +108,7 @@
         }
     }
 
-    /**
-     * @return the maxPacketSize
-     */
-    public int getMaxPacketSize() {
-        return this.maxPacketSize;
-    }
-
-    /**
-     * @param maxPacketSize the maxPacketSize to set
-     */
-    public void setMaxPacketSize(int maxPacketSize) {
-        this.maxPacketSize = maxPacketSize;
-    }
+    
 
     /**
      * @return the maxCacheSize

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
Thu Jan 15 15:48:02 2009
@@ -17,6 +17,7 @@
 package org.apache.activeblaze.impl.reliable;
 
 import java.util.Map;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.util.ObjectFinder;
 import org.apache.activeblaze.util.PropertyUtil;
@@ -40,7 +41,7 @@
         return result;
     }
     
-    static void configure(DefaultChainedProcessor transport, String location) throws Exception
{
+    static void configure(ChainedProcessor transport, String location) throws Exception {
         Map<String, String> options = PropertyUtil.parseParameters(location);
         PropertyUtil.setProperties(transport, options);
     }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
Thu Jan 15 15:48:02 2009
@@ -31,11 +31,9 @@
  */
 public abstract class BaseTransport extends ThreadChainedProcessor {
     private static final Log LOG = LogFactory.getLog(BaseTransport.class);
-    static final int DEFAULT_MAX_PACKET_SIZE = BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE;
     static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
     private URI localURI;
     private Buffer bufferOfLocalURI;
-    private int maxPacketSize = DEFAULT_MAX_PACKET_SIZE;
     private int bufferSize = DEFAULT_BUFFER_SIZE;
     private int soTimeout = 2000;
     private int timeToLive = 1;
@@ -117,22 +115,6 @@
             this.bufferOfLocalURI = new Buffer(this.localURI.toString());
         }
     }
-
-    /**
-     * @return the maxPacketSize
-     */
-    public int getMaxPacketSize() {
-        return this.maxPacketSize;
-    }
-
-    /**
-     * @param maxPacketSize
-     *            the maxPacketSize to set
-     */
-    public void setMaxPacketSize(int maxPacketSize) {
-        this.maxPacketSize = maxPacketSize;
-    }
-
     /**
      * @return the bufferSize
      */

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
(original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
Thu Jan 15 15:48:02 2009
@@ -53,7 +53,7 @@
         latch.await(10, TimeUnit.SECONDS);
         receiver.stop();
         sender.stop();
-        assertEquals("Not enough messages", 0, latch.getCount());
+        assertEquals("Too many messages not sent ", 0, latch.getCount());
     }
 
     public void testGroupBroadcast() throws Exception {

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java?rev=734856&r1=734855&r2=734856&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
(original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
Thu Jan 15 15:48:02 2009
@@ -79,7 +79,7 @@
 
     public void testUpStream() throws Exception {
         final AtomicBoolean test = new AtomicBoolean();
-        DefaultChainedProcessor target = new DefaultChainedProcessor() {
+        ChainedProcessor target = new DefaultChainedProcessor() {
             public void upStream(Packet p) {
                 test.set(true);
             }



Mime
View raw message