activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r738017 [1/2] - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/cluster/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/network/ main/java/org/apache/a...
Date Tue, 27 Jan 2009 07:30:18 GMT
Author: rajdavies
Date: Tue Jan 27 07:30:16 2009
New Revision: 738017

URL: http://svn.apache.org/viewvc?rev=738017&view=rev
Log:
More fixes for pointcast

Added:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReceivedPacket.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableBuffer.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliablePacket.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/package.html
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleFlow.java
      - copied, changed from r734779, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/package.html
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/package.html
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LinkedNode.java   (with props)
    activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/reliable/swp
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkProcessor.java   (with props)
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkSwitch.java   (with props)
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java   (with props)
Removed:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/
Modified:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IdGenerator.java
    activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java Tue Jan 27 07:30:16 2009
@@ -20,64 +20,75 @@
 
 /**
  * Implementation of a Service
- *
+ * 
  */
-public class BaseService implements Service {
-
+public abstract class BaseService implements Service {
     AtomicBoolean initialialized = new AtomicBoolean();
     AtomicBoolean started = new AtomicBoolean();
-    AtomicBoolean hasStarted = new AtomicBoolean();
-   
-    public boolean init() throws Exception {
-        boolean result =  this.initialialized.compareAndSet(false, true);
+
+    protected abstract void doInit() throws Exception;
+
+    protected abstract void doShutDown() throws Exception;
+
+    protected abstract void doStart() throws Exception;
+
+    protected abstract void doStop() throws Exception;
+
+    public final boolean init() throws Exception {
+        boolean result = this.initialialized.compareAndSet(false, true);
         this.initialialized.set(true);
+        if (result) {
+            doInit();
+        }
         return result;
     }
 
-    
-    public boolean shutDown() throws Exception {
+    public final boolean shutDown() throws Exception {
         if (isStarted()) {
             stop();
         }
-        boolean result =  this.initialialized.compareAndSet(true, false);
+        boolean result = this.initialialized.compareAndSet(true, false);
         this.initialialized.set(false);
+        if (result) {
+            doShutDown();
+        }
         return result;
     }
 
-    
-    public boolean start() throws Exception {
+    public final boolean start() throws Exception {
         if (!this.initialialized.get()) {
             init();
         }
-        boolean result =  this.started.compareAndSet(false, true); 
+        boolean result = this.started.compareAndSet(false, true);
         this.started.set(true);
-        this.hasStarted.set(true);
+        if (result) {
+            doStart();
+        }
         return result;
     }
 
-    
-    public boolean stop() throws Exception {
+    public final boolean stop() throws Exception {
         boolean result = this.started.compareAndSet(true, false);
         this.started.set(false);
+        if (result) {
+            doStop();
+        }
         return result;
     }
-    
-    public boolean isStarted() {
+
+    public final boolean isStarted() {
         return this.started.get();
     }
-    
-    public boolean isStopped() {
+
+    public final boolean isStopped() {
         return !isStarted();
     }
-   
-    public boolean isInitialized() {
+
+    public final boolean isInitialized() {
         return this.initialialized.get();
     }
 
-    public boolean isShutDown() {
-       return !isInitialized();
-    }
-    public boolean hasStarted() {
-        return hasStarted.get();
+    public final boolean isShutDown() {
+        return !isInitialized();
     }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Tue Jan 27 07:30:16 2009
@@ -19,7 +19,6 @@
 import java.net.URI;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activeblaze.impl.destination.DestinationMatch;
 import org.apache.activeblaze.impl.network.Network;
 import org.apache.activeblaze.impl.network.NetworkFactory;
@@ -48,8 +47,6 @@
     protected Map<Buffer, BlazeTopicListener> topicessageListenerMap = new ConcurrentHashMap<Buffer, BlazeTopicListener>();
     protected final IdGenerator idGenerator = new IdGenerator();
     protected Buffer producerId;
-    protected AtomicLong sequence = new AtomicLong();
-    protected AtomicLong session = new AtomicLong(1);
     protected Processor broadcast;
     protected BlazeConfiguration configuration = new BlazeConfiguration();
     private String id;
@@ -93,26 +90,23 @@
         return this.topicessageListenerMap.remove(key);
     }
 
-    public boolean init() throws Exception {
-        boolean result = super.init();
-        if (result) {
-            String broadcastURIStr = getConfiguration().getBroadcastURI();
-            broadcastURIStr = PropertyUtil.addPropertiesToURIFromBean(broadcastURIStr, getConfiguration());
-            URI broadcastURI = new URI(broadcastURIStr);
-            URI managementURI = null;
-            String managementURIStr = getConfiguration().getManagementURI();
-            if (managementURIStr != null && managementURIStr.length() > 0) {
-                managementURI = new URI(getConfiguration().getManagementURI());
-            }
-            Network transport = NetworkFactory.get(broadcastURI, managementURI);
-            transport.setName(getId());
-            this.broadcast = configureProcess(transport);
-            this.broadcast.init();
-        }
-        return result;
+    public void doInit() throws Exception {
+        super.doInit();
+        String broadcastURIStr = getConfiguration().getBroadcastURI();
+        broadcastURIStr = PropertyUtil.addPropertiesToURIFromBean(broadcastURIStr, getConfiguration());
+        URI broadcastURI = new URI(broadcastURIStr);
+        URI managementURI = null;
+        String managementURIStr = getConfiguration().getManagementURI();
+        if (managementURIStr != null && managementURIStr.length() > 0) {
+            managementURI = new URI(getConfiguration().getManagementURI());
+        }
+        Network transport = NetworkFactory.get(broadcastURI, managementURI);
+        transport.setName(getId());
+        this.broadcast = configureProcess(transport, getConfiguration().getReliableBroadcast());
+        this.broadcast.init();
     }
 
-    protected final Processor configureProcess(ChainedProcessor transport) throws Exception {
+    protected final Processor configureProcess(ChainedProcessor transport, String reliability) throws Exception {
         int maxPacketSize = getConfiguration().getMaxPacketSize();
         CompressionProcessor result = new CompressionProcessor();
         result.setPrev(this);
@@ -121,34 +115,30 @@
         FragmentationProcessor fp = new FragmentationProcessor();
         fp.setMaxPacketSize(maxPacketSize);
         result.setEnd(fp);
-        DefaultChainedProcessor reliable = ReliableFactory.get(getConfiguration().getReliable());
+        ChainedProcessor reliable = getReliability(reliability);
         result.setEnd(reliable);
         result.setEnd(transport);
         return result;
     }
 
-    public boolean shutDown() throws Exception {
-        boolean result = super.shutDown();
-        if (result) {
-            this.broadcast.shutDown();
-        }
-        return result;
+    protected ChainedProcessor getReliability(String reliability) throws Exception {
+        DefaultChainedProcessor reliable = ReliableFactory.get(reliability);
+        return reliable;
     }
 
-    public boolean start() throws Exception {
-        boolean result = super.start();
-        if (true ||result) {
-            this.broadcast.start();
-        }
-        return result;
+    public void doShutDown() throws Exception {
+        super.doShutDown();
+        this.broadcast.shutDown();
     }
 
-    public boolean stop() throws Exception {
-        boolean result = super.stop();
-        if (result) {
-            this.broadcast.stop();
-        }
-        return result;
+    public void doStart() throws Exception {
+        super.doStart();
+        this.broadcast.start();
+    }
+
+    public void doStop() throws Exception {
+        super.doStop();
+        this.broadcast.stop();
     }
 
     public synchronized void broadcast(String destination, BlazeMessage msg) throws Exception {
@@ -166,13 +156,8 @@
         PacketData packetData = new PacketData();
         packetData.setType(type.getNumber());
         packetData.setProducerId(this.producerId);
-        packetData.setSessionId(this.session.get());
-        long sequence = this.sequence.incrementAndGet();
-        packetData.setMessageSequence(sequence);
         packetData.setPayload(message.toFramedBuffer());
-        StringBuilder builder = new StringBuilder(this.id.length() + 32);
-        builder.append(this.id).append(":").append(sequence);
-        packetData.setMessageId(new Buffer(builder.toString()));
+        packetData.setMessageId(new Buffer(this.idGenerator.generateId()));
         return packetData;
     }
 

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java Tue Jan 27 07:30:16 2009
@@ -35,7 +35,7 @@
     private int maxDispatchQueueSize = 10000;
     private int maxPacketSize = DEFAULT_MAX_PACKET_SIZE;
     // reliability
-    private String reliable = "simple";
+    private String reliableBroadcast = "simple";
 
     /**
      * @return the unicastPort
@@ -132,15 +132,15 @@
     /**
      * @return the reliable
      */
-    public String getReliable() {
-        return this.reliable;
+    public String getReliableBroadcast() {
+        return this.reliableBroadcast;
     }
 
     /**
      * @param reliable
      *            the reliable to set
      */
-    public void setReliable(String reliable) {
-        this.reliable = reliable;
+    public void setReliableBroadcast(String reliable) {
+        this.reliableBroadcast = reliable;
     }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java Tue Jan 27 07:30:16 2009
@@ -59,40 +59,28 @@
         return this.state;
     }
 
-    public boolean init() throws Exception {
-        boolean result = super.init();
-        if (result) {
-            this.clusterGroup.init();
-            this.state.init();
-        }
-        return result;
+    public void doInit() throws Exception {
+        super.doInit();
+        this.clusterGroup.init();
+        this.state.init();
     }
 
-    public boolean shutDown() throws Exception {
-        boolean result = super.shutDown();
-        if (result) {
-            this.clusterGroup.shutDown();
-            this.state.shutDown();
-        }
-        return result;
+    public void doShutDown() throws Exception {
+        super.doShutDown();
+        this.clusterGroup.shutDown();
+        this.state.shutDown();
     }
 
-    public boolean start() throws Exception {
-        boolean result = super.start();
-        if (result) {
-            this.clusterGroup.start();
-            this.state.start();
-        }
-        return result;
+    public void doStart() throws Exception {
+        super.doStart();
+        this.clusterGroup.start();
+        this.state.start();
     }
 
-    public boolean stop() throws Exception {
-        boolean result = super.stop();
-        if (result) {
-            this.clusterGroup.stop();
-            this.state.stop();
-        }
-        return result;
+    public void doStop() throws Exception {
+        super.doStop();
+        this.clusterGroup.stop();
+        this.state.stop();
     }
 
     /**
@@ -106,7 +94,7 @@
     }
 
     /**
-     * @return
+     * @return Member
      * @throws Exception
      * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#getMaster()
      */
@@ -118,7 +106,7 @@
     }
 
     /**
-     * @return
+     * @return true if the Master
      * @throws Exception
      * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#isMaster()
      */
@@ -141,7 +129,7 @@
     }
 
     /**
-     * @return
+     * @return the configuration
      * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#getCoordinatedGroupConfiguration()
      */
     public BlazeClusterGroupConfiguration getConfiguration() {
@@ -152,7 +140,7 @@
      * @param timeout
      * @return true if election is finished
      * @throws Exception
-     * @see org.apache.activeblaze.group.BlazeGroupChannel#waitForElection(int)
+     * @see {@link org.apache.activeblaze.group.BlazeGroupChannel#waitForElection(int)
      */
     public boolean waitForElection(int timeout) throws Exception {
         init();
@@ -188,7 +176,7 @@
 
     protected MemberImpl createLocal(URI uri) throws Exception {
         BlazeClusterGroupConfiguration c = getConfiguration();
-        return new MemberImpl(getId(), getName(), c.getMasterWeight(),c.getRefinedMasterWeight(),uri);
+        return new MemberImpl(getId(), getName(), c.getMasterWeight(), c.getRefinedMasterWeight(), uri);
     }
 
     public MemberImpl getLocalMember() {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java Tue Jan 27 07:30:16 2009
@@ -46,6 +46,7 @@
     private MemberImpl master;
     private List<MasterChangedListener> listeners = new CopyOnWriteArrayList<MasterChangedListener>();
     final AtomicBoolean electionFinished = new AtomicBoolean(false);
+
     /**
      * Constructor
      * 
@@ -62,48 +63,39 @@
     }
 
     /**
-     * @return
      * @throws Exception
      * @see org.apache.activeblaze.Service#start()
      */
-    public boolean start() throws Exception {
-        boolean result = super.start();
-        if (result) {
-            this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
-                    new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
-                        public Thread newThread(Runnable runnable) {
-                            Thread thread = new Thread(runnable, "Election{" + ClusterGroup.this.channel.getId() + "}");
-                            thread.setDaemon(true);
-                            return thread;
-                        }
-                    });
-            election(null, true);
-        }
-        return result;
+    public void doStart() throws Exception {
+        super.doStart();
+        this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+                    public Thread newThread(Runnable runnable) {
+                        Thread thread = new Thread(runnable, "Election{" + ClusterGroup.this.channel.getId() + "}");
+                        thread.setDaemon(true);
+                        return thread;
+                    }
+                });
+        election(null, true);
     }
 
     /**
-     * @return
      * @throws Exception
      * @see org.apache.activeblaze.Service#stop()
      */
-    public boolean stop() throws Exception {
-        boolean result = super.stop();
-        if (result) {
-            if (this.electionExecutor != null) {
-                this.electionExecutor.shutdownNow();
-                synchronized(this.electionFinished) {
-                    this.electionFinished.notifyAll();
-                }
+    public void doStop() throws Exception {
+        super.doStop();
+        if (this.electionExecutor != null) {
+            this.electionExecutor.shutdownNow();
+            synchronized (this.electionFinished) {
+                this.electionFinished.notifyAll();
             }
         }
-        return result;
     }
 
-    public boolean shutDown() throws Exception {
-        boolean result = super.shutDown();
+    public void doShutDown() throws Exception {
+        super.doShutDown();
         setMaster(null);
-        return result;
     }
 
     /**
@@ -197,7 +189,7 @@
     protected void setMaster(MemberImpl member) {
         MemberImpl oldMaster = this.master;
         this.master = member;
-        if (oldMaster == null || (oldMaster != null && this.master != null && !this.master.equals(oldMaster))) {
+        if (oldMaster == null || (this.master != null && !this.master.equals(oldMaster))) {
             fireClusterChanged(this.master);
         }
     }
@@ -230,7 +222,7 @@
 
     void processElectionMessage(ElectionMessage msg, String correlationId) throws Exception {
         MemberImpl from = new MemberImpl(msg.getMember());
-        if (from != null && !from.getId().equals(getLocalMember().getId())) {
+        if (!from.getId().equals(getLocalMember().getId())) {
             LOG.debug(getLocalMember() + " Election message " + msg.getElectionType() + " from " + from);
             if (msg.getElectionType().equals(ElectionType.ELECTION)) {
                 ElectionMessage reply = new ElectionMessage();

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java Tue Jan 27 07:30:16 2009
@@ -444,74 +444,58 @@
         this.requestTimeout = requestTimeout;
     }
 
-    public boolean init() throws Exception {
-        boolean result = super.init();
-        if (result) {
-            this.channel.addMemberChangedListener(this);
-            this.dispatchQueue = new LinkedBlockingQueue<PacketData>(getMaxDispatchQueueSize());
-        }
-        return result;
-    }
-
-    public boolean shutDown() throws Exception {
-        boolean result = super.shutDown();
-        if (result) {
-            this.channel.removeMemberChangedListener(this);
-        }
-        return result;
-    }
-
-    public boolean start() throws Exception {
-        boolean result = super.start();
-        if (result) {
-            this.stateChangedExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
-                public Thread newThread(Runnable runnable) {
-                    Thread thread = new Thread(runnable, "Cluster State{" + ClusterState.this.channel.getLocalMember()
-                            + "}");
-                    thread.setDaemon(true);
-                    return thread;
-                }
-            });
-            TimerTask task = new TimerTask() {
-                @Override
-                public void run() {
-                    try {
-                        expirationSweep();
-                    } catch (Exception e) {
-                        ClusterState.LOG.error("Failed to do expiration sweep", e);
-                    }
+    public void doInit() throws Exception {
+        this.channel.addMemberChangedListener(this);
+        this.dispatchQueue = new LinkedBlockingQueue<PacketData>(getMaxDispatchQueueSize());
+    }
+
+    public void doShutDown() throws Exception {
+        this.channel.removeMemberChangedListener(this);
+    }
+
+    public void doStart() throws Exception {
+        this.stateChangedExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, "Cluster State{" + ClusterState.this.channel.getLocalMember()
+                        + "}");
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
+        TimerTask task = new TimerTask() {
+            @Override
+            public void run() {
+                try {
+                    expirationSweep();
+                } catch (Exception e) {
+                    ClusterState.LOG.error("Failed to do expiration sweep", e);
                 }
-            };
-            this.expirationTimer = new Timer(true);
-            this.expirationTimer.scheduleAtFixedRate(task, 500, 500);
-            Runnable runable = new Runnable() {
-                public void run() {
-                    while (isStarted()) {
-                        dequeuePackets();
-                    }
+            }
+        };
+        this.expirationTimer = new Timer(true);
+        this.expirationTimer.scheduleAtFixedRate(task, 500, 500);
+        Runnable runable = new Runnable() {
+            public void run() {
+                while (isStarted()) {
+                    dequeuePackets();
                 }
-            };
-            this.dispatchQueueThread = new Thread(runable, toString() + "-DispatchQueue");
-            this.dispatchQueueThread.setDaemon(true);
-            this.dispatchQueueThread.start();
-        }
-        return result;
+            }
+        };
+        this.dispatchQueueThread = new Thread(runable, toString() + "-DispatchQueue");
+        this.dispatchQueueThread.setDaemon(true);
+        this.dispatchQueueThread.start();
     }
 
-    public boolean stop() throws Exception {
-        boolean result = super.stop();
-        if (result) {
-            if (this.dispatchQueueThread != null) {
-                this.dispatchQueueThread.interrupt();
-                try {
-                    this.dispatchQueueThread.join(100);
-                } catch (InterruptedException e) {
-                }
+    public void doStop() throws Exception {
+        if (this.dispatchQueueThread != null) {
+            this.dispatchQueueThread.interrupt();
+            try {
+                this.dispatchQueueThread.join(100);
+            } catch (InterruptedException e) {
             }
-            this.stateChangedExecutor.shutdown();
-            this.expirationTimer.cancel();
         }
-        return result;
+        this.stateChangedExecutor.shutdown();
+        this.expirationTimer.cancel();
     }
 
     protected void stopInternal() {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java Tue Jan 27 07:30:16 2009
@@ -114,4 +114,44 @@
         MemberImpl result = sorted.isEmpty() ? this.group.getLocalMember() : sorted.get(list.size() - 1);
         return result;
     }
+
+    /**
+     * @throws Exception
+     * @see org.apache.activeblaze.BaseService#doInit()
+     */
+    @Override
+    protected void doInit() throws Exception {
+        // TODO Auto-generated method stub
+        
+    }
+
+    /**
+     * @throws Exception
+     * @see org.apache.activeblaze.BaseService#doShutDown()
+     */
+    @Override
+    protected void doShutDown() throws Exception {
+        // TODO Auto-generated method stub
+        
+    }
+
+    /**
+     * @throws Exception
+     * @see org.apache.activeblaze.BaseService#doStart()
+     */
+    @Override
+    protected void doStart() throws Exception {
+        // TODO Auto-generated method stub
+        
+    }
+
+    /**
+     * @throws Exception
+     * @see org.apache.activeblaze.BaseService#doStop()
+     */
+    @Override
+    protected void doStop() throws Exception {
+        // TODO Auto-generated method stub
+        
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Tue Jan 27 07:30:16 2009
@@ -29,7 +29,10 @@
 import org.apache.activeblaze.BlazeTopicListener;
 import org.apache.activeblaze.Processor;
 import org.apache.activeblaze.impl.destination.DestinationMatch;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.reliable.ReliableFactory;
 import org.apache.activeblaze.impl.transport.BaseTransport;
 import org.apache.activeblaze.impl.transport.TransportFactory;
 import org.apache.activeblaze.util.AsyncGroupRequest;
@@ -75,35 +78,35 @@
     }
 
     /**
-     * @return true if initialized
      * @throws Exception
      * @see org.apache.activeblaze.Service#init()
      */
-    public boolean init() throws Exception {
-        boolean result = super.init();
-        if (result) {
-            String unicastURIStr = getConfiguration().getUnicastURI();
-            unicastURIStr = PropertyUtil.addPropertiesToURIFromBean(unicastURIStr, getConfiguration());
-            URI unicastURI = new URI(unicastURIStr);
-            
-            BaseTransport transport = TransportFactory.get(unicastURI);
-            transport.setName(getId() + "-Unicast");
-            this.unicast = configureProcess(transport);
-            this.unicast.init();
-            // if using a port of zero - the port will be assigned automatically,
-            // so need to get the potentially new value
-            unicastURI = transport.getLocalURI();
-            InetSocketAddress addr = new InetSocketAddress(unicastURI.getHost(),unicastURI.getPort());
-            this.inboxAddress=new Buffer(addr.getAddress().getAddress());
-            this.inBoxPort=addr.getPort();
-            this.local = createLocal(unicastURI);
-            this.group = createGroup();
-        }
-        return result;
+    public void doInit() throws Exception {
+        super.doInit();
+        String unicastURIStr = getConfiguration().getUnicastURI();
+        unicastURIStr = PropertyUtil.addPropertiesToURIFromBean(unicastURIStr, getConfiguration());
+        URI unicastURI = new URI(unicastURIStr);
+        BaseTransport transport = TransportFactory.get(unicastURI);
+        transport.setName(getId() + "-Unicast");
+        this.unicast = configureProcess(transport,getConfiguration().getReliableUnicast());
+        this.unicast.init();
+        // if using a port of zero - the port will be assigned automatically,
+        // so need to get the potentially new value
+        unicastURI = transport.getLocalURI();
+        InetSocketAddress addr = new InetSocketAddress(unicastURI.getHost(), unicastURI.getPort());
+        this.inboxAddress = new Buffer(addr.getAddress().getAddress());
+        this.inBoxPort = addr.getPort();
+        this.local = createLocal(unicastURI);
+        this.group = createGroup();
+    }
+
+    protected ChainedProcessor getReliability(String reliability) throws Exception {
+        DefaultChainedProcessor reliable = ReliableFactory.get(reliability);
+        return reliable;
     }
 
     protected MemberImpl createLocal(URI uri) throws Exception {
-        return new MemberImpl(getId(), getName(), 0,0, uri);
+        return new MemberImpl(getId(), getName(), 0, 0, uri);
     }
 
     protected Group createGroup() {
@@ -115,14 +118,10 @@
      * @throws Exception
      * @see org.apache.activeblaze.Service#shutDown()
      */
-    public boolean shutDown() throws Exception {
-        boolean result = super.shutDown();
-        if (result) {
-            this.group.shutDown();
-           
-            this.unicast.shutDown();
-        }
-        return result;
+    public void doShutDown() throws Exception {
+        super.doShutDown();
+        this.group.shutDown();
+        this.unicast.shutDown();
     }
 
     /**
@@ -130,13 +129,10 @@
      * @throws Exception
      * @see org.apache.activeblaze.Service#start()
      */
-    public boolean start() throws Exception {
-        boolean result = super.start();
-        if (result) {
-            this.unicast.start();
-            this.group.start();
-        }
-        return result;
+    public void doStart() throws Exception {
+        super.doStart();
+        this.unicast.start();
+        this.group.start();
     }
 
     /**
@@ -144,14 +140,10 @@
      * @throws Exception
      * @see org.apache.activeblaze.Service#stop()
      */
-    public boolean stop() throws Exception {
-        boolean result = super.stop();
-        if (result) {
-            this.group.stop();
-          
-            this.unicast.stop();
-        }
-        return result;
+    public void doStop() throws Exception {
+        super.doStop();
+        this.group.stop();
+        this.unicast.stop();
     }
 
     /**
@@ -222,7 +214,7 @@
 
     /**
      * @param id
-     * @return
+     * @return the Member
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getMemberById(java.lang.String)
      */
@@ -233,7 +225,7 @@
 
     /**
      * @param name
-     * @return
+     * @return the Member
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getMemberByName(java.lang.String)
      */
@@ -256,7 +248,7 @@
     }
 
     /**
-     * @return
+     * @return the members
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getMembers()
      */
@@ -282,14 +274,7 @@
                     send(member, key, message);
                     return;
                 } catch (BlazeNoRouteException e) {
-                   LOG.debug("No response - resending to another client",e);
-                   
-                   
-                   
-                   
-                   
-                   
-                   
+                    LOG.debug("No response - resending to another client", e);
                 }
             } else {
                 return;
@@ -299,7 +284,6 @@
 
     /**
      * @param member
-     * @param destination
      * @param message
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#send(org.apache.activeblaze.group.Member,
@@ -312,7 +296,7 @@
     /**
      * @param member
      * @param message
-     * @return
+     * @return the response
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage)
@@ -325,7 +309,7 @@
      * @param member
      * @param message
      * @param timeout
-     * @return
+     * @return the response
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage, int)
@@ -337,7 +321,7 @@
     /**
      * @param destination
      * @param message
-     * @return
+     * @return the response
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage)
@@ -350,7 +334,7 @@
      * @param destination
      * @param message
      * @param timeout
-     * @return
+     * @return the response
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage, int)
@@ -390,7 +374,7 @@
      * @param destination
      * @param message
      * @param timeout
-     * @return
+     * @return the response
      * @throws Exception
      */
     public BlazeMessage sendRequest(MemberImpl member, Buffer destination, BlazeMessage message, int timeout)
@@ -468,9 +452,9 @@
 
     /**
      * @param destination
-     * @return
+     * @return the removed <Code>BlazeQueueListener</Code>
      * @throws Exception
-     * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeGroupMessageListener(java.lang.String)
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String)
      */
     public BlazeQueueListener removeBlazeQueueMessageListener(String destination) throws Exception {
         init();
@@ -495,11 +479,9 @@
 
     /**
      * @param destination
-     * @param l
-     * @return
+     * @return the removed <Code>BlazeTopicListener</Code>
      * @throws Exception
-     * @see org.apache.activeblaze.BlazeChannel#removeBlazeMessageListener(java.lang.String,
-     *      org.apache.activeblaze.BlazeTopicListener)
+     * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(java.lang.String)
      */
     public BlazeTopicListener removeBlazeTopicMessageListener(String destination) throws Exception {
         init();
@@ -507,40 +489,37 @@
         buildLocal();
         return result;
     }
-    
+
     /**
      * @param groupName
-     * @throws Exception 
+     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addToGroup(java.lang.String)
      */
     public void addToGroup(String groupName) throws Exception {
         init();
         this.local.addToGroup(groupName);
-        
     }
-    
+
     /**
      * @param groupName
-     * @throws Exception 
+     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#removeFromGroup(java.lang.String)
      */
     public void removeFromGroup(String groupName) throws Exception {
-      init();
-      this.local.removeFromGroup(groupName);
-        
+        init();
+        this.local.removeFromGroup(groupName);
     }
 
     /**
-     * @return
-     * @throws Exception 
+     * @return the groups
+     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getGroups()
      */
     public List<String> getGroups() throws Exception {
-       init();
-       return this.local.getGroups();
+        init();
+        return this.local.getGroups();
     }
 
-
     protected void processData(String id, Buffer correlationId, PacketData data) throws Exception {
         if (isStarted()) {
             if (!processRequest(correlationId, data)) {
@@ -570,7 +549,7 @@
     }
 
     protected void doProcessBlazeData(PacketData data) throws Exception {
-        BlazeMessage message = (BlazeMessage) buildBlazeMessage(data);
+        BlazeMessage message = buildBlazeMessage(data);
         if (message.getContent().getTopic()) {
             dispatch(message);
         } else {
@@ -611,7 +590,6 @@
      * @param message
      * @throws Exception
      */
-
     public void broadcastMessage(MessageType messageType, Message<?> message) throws Exception {
         PacketData data = getPacketData(messageType, message);
         data.setReliable(false);
@@ -621,14 +599,15 @@
 
     /**
      * send a message
+     * 
      * @param asyncRequest
      * @param member
      * @param messageType
      * @param message
      * @throws Exception
      */
-    public void sendMessage(AsyncGroupRequest asyncRequest,MemberImpl member, MessageType messageType, Message<?> message)
-            throws Exception {
+    public void sendMessage(AsyncGroupRequest asyncRequest, MemberImpl member, MessageType messageType,
+            Message<?> message) throws Exception {
         SendRequest request = new SendRequest();
         PacketData data = getPacketData(messageType, message);
         asyncRequest.add(data.getMessageId(), request);
@@ -654,7 +633,6 @@
         data.setCorrelationId(new Buffer(correlationId));
         data.setReliable(true);
         Packet packet = new Packet(data);
-        
         this.broadcast.downStreamManagement(packet);
     }
 
@@ -745,7 +723,4 @@
             throw new BlazeRuntimeException("Not Initialized");
         }
     }
-
-   
-  
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java Tue Jan 27 07:30:16 2009
@@ -25,6 +25,8 @@
 public class BlazeGroupConfiguration extends BlazeConfiguration {
     private int heartBeatInterval = 800;
     private String unicastURI = "udp://localhost:0";
+    //reliability
+    private String reliableUnicast ="swp";
 
     /**
      * @return the heartBeatInterval
@@ -59,4 +61,18 @@
     public void setUnicastURI(String unicastURI) {
         this.unicastURI = unicastURI;
     }
+
+    /**
+     * @return the reliableUnicast
+     */
+    public String getReliableUnicast() {
+        return this.reliableUnicast;
+    }
+
+    /**
+     * @param reliableUnicast the reliableUnicast to set
+     */
+    public void setReliableUnicast(String reliableUnicast) {
+        this.reliableUnicast = reliableUnicast;
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java Tue Jan 27 07:30:16 2009
@@ -67,7 +67,6 @@
 
     /**
      * @return the Member of the Channel
-     * @throws Exception
      */
     public MemberImpl getLocalMember() {
         return this.channel.getLocalMember();
@@ -119,7 +118,7 @@
     public Set<MemberImpl> getMembersImpl() {
         return new HashSet<MemberImpl>(this.members.values());
     }
-    
+
     /**
      * @return the number of members in the group
      */
@@ -165,17 +164,18 @@
     public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException {
         Member result = null;
         long deadline = 0;
+        long waitTime = timeout;
         if (timeout > 0) {
             deadline = System.currentTimeMillis() + timeout;
         }
-        while (true) {
+        while (isStarted() && (timeout == 0 || waitTime > 0)) {
             result = getMemberByName(name);
             if (result == null) {
                 synchronized (this.members) {
                     this.members.wait(timeout);
                 }
                 if (timeout > 0) {
-                    timeout = (int) Math.max(deadline - System.currentTimeMillis(), 0l);
+                    waitTime = (int) Math.max(deadline - System.currentTimeMillis(), 0l);
                 }
             } else {
                 break;
@@ -185,94 +185,80 @@
     }
 
     /**
-     * @return
+     * 
      * @throws Exception
      * @see org.apache.activeblaze.Service#init()
      */
-    public boolean init() throws Exception {
-        boolean result = super.init();
-        if (result) {
-            ThreadFactory threadFactory = new ThreadFactory() {
-                public Thread newThread(Runnable r) {
-                    Thread thread = new Thread(r);
-                    thread.setDaemon(true);
-                    return thread;
-                }
-            };
-            this.listenerService = Executors.newCachedThreadPool(threadFactory);
-            this.members.put(this.channel.getId(), this.channel.getLocalMember());
-        }
-        return result;
+    public void doInit() throws Exception {
+        ThreadFactory threadFactory = new ThreadFactory() {
+            public Thread newThread(Runnable r) {
+                Thread thread = new Thread(r);
+                thread.setDaemon(true);
+                return thread;
+            }
+        };
+        this.listenerService = Executors.newCachedThreadPool(threadFactory);
+        this.members.put(this.channel.getId(), this.channel.getLocalMember());
     }
 
     /**
-     * @return
+     * 
      * @throws Exception
      * @see org.apache.activeblaze.Service#shutDown()
      */
-    public boolean shutDown() throws Exception {
-        boolean result = super.shutDown();
+    public void doShutDown() throws Exception {
         this.members.clear();
         if (this.listenerService != null) {
             this.listenerService.shutdownNow();
         }
-        return result;
     }
 
     /**
-     * @return
+     * 
      * @throws Exception
      * @see org.apache.activeblaze.Service#start()
      */
-    public boolean start() throws Exception {
-        boolean result = super.start();
-        if (result) {
-            TimerTask heartbeat = new TimerTask() {
-                public void run() {
+    public void doStart() throws Exception {
+        TimerTask heartbeat = new TimerTask() {
+            public void run() {
+                try {
+                    broadcastHeartBeat(getLocalMember());
+                } catch (Exception e) {
+                    LOG.error("Failed to send heartbeat", e);
+                }
+            }
+        };
+        heartbeat.run();
+        int interval = this.configuration.getHeartBeatInterval() / 4;
+        this.heartBeatTimer = new Timer(true);
+        this.heartBeatTimer.scheduleAtFixedRate(heartbeat, interval, interval);
+        TimerTask checkMembership = new TimerTask() {
+            public void run() {
+                if (isStarted()) {
                     try {
-                        broadcastHeartBeat(getLocalMember());
+                        checkMembership();
                     } catch (Exception e) {
-                        LOG.error("Failed to send heartbeat", e);
+                        LOG.error("Failed to checkMembership", e);
                     }
                 }
-            };
-            heartbeat.run();
-            int interval = this.configuration.getHeartBeatInterval() / 4;
-            this.heartBeatTimer = new Timer(true);
-            this.heartBeatTimer.scheduleAtFixedRate(heartbeat, interval, interval);
-            TimerTask checkMembership = new TimerTask() {
-                public void run() {
-                    if (isStarted()) {
-                        try {
-                            checkMembership();
-                        } catch (Exception e) {
-                            LOG.error("Failed to checkMembership", e);
-                        }
-                    }
-                }
-            };
-            this.checkMemberShipTimer = new Timer(true);
-            this.checkMemberShipTimer.scheduleAtFixedRate(checkMembership, interval, interval / 2);
-        }
-        return result;
+            }
+        };
+        this.checkMemberShipTimer = new Timer(true);
+        this.checkMemberShipTimer.scheduleAtFixedRate(checkMembership, interval, interval / 2);
     }
 
     /**
-     * @return
+     * 
      * @throws Exception
      * @see org.apache.activeblaze.Service#stop()
      */
-    public boolean stop() throws Exception {
-        boolean result = super.stop();
-        if (result) {
-            if (this.heartBeatTimer != null) {
-                this.heartBeatTimer.cancel();
-            }
-            if (this.checkMemberShipTimer != null) {
-                this.checkMemberShipTimer.cancel();
-            }
+    public void doStop() throws Exception {
+        if (this.heartBeatTimer != null) {
+            this.heartBeatTimer.cancel();
+        }
+        if (this.checkMemberShipTimer != null) {
+            this.checkMemberShipTimer.cancel();
         }
-        return result;
     }
 
     public String toString() {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java Tue Jan 27 07:30:16 2009
@@ -24,7 +24,6 @@
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.transport.BaseTransport;
 import org.apache.activeblaze.impl.transport.TransportFactory;
-import org.apache.activemq.protobuf.Buffer;
 
 /**
  * Uses multicast to implement a Network
@@ -38,7 +37,7 @@
     private String name = "";
     private InetSocketAddress broadcastAddress;
     private InetSocketAddress managementAddress;
-    
+
     /**
      * @return the name
      */
@@ -75,76 +74,69 @@
      * @throws Exception
      * @see org.apache.activeblaze.Service#init()
      */
-    public boolean init() throws Exception {
-        boolean result = super.init();
-        if (result) {
-            this.broadcast = TransportFactory.get(this.uri);
-            this.broadcast.setName(getName() + "-Broadcast");
-            this.broadcast.setExceptionListener(this);
-            this.broadcast.setPrev(getPrev());
-            this.broadcast.setNext(getNext());
-            this.broadcastAddress = new InetSocketAddress(this.uri.getHost(), this.uri.getPort());
-            this.broadcast.init();
-            if (this.managementURI != null && !this.managementURI.equals(this.uri)) {
-                this.management = TransportFactory.get(this.managementURI);
-                this.management.setName(getName() + "-Management");
-                this.management.setExceptionListener(this);
-                this.management.setPrev(getPrev());
-                this.management.setNext(getNext());
-                this.managementAddress = new InetSocketAddress(this.managementURI.getHost(), this.managementURI
-                        .getPort());
-                this.management.init();
-            }
+    public void doInit() throws Exception {
+        super.doInit();
+        this.broadcast = TransportFactory.get(this.uri);
+        this.broadcast.setName(getName() + "-Broadcast");
+        this.broadcast.setExceptionListener(this);
+        this.broadcast.setPrev(getPrev());
+        this.broadcast.setNext(getNext());
+        this.broadcastAddress = new InetSocketAddress(this.uri.getHost(), this.uri.getPort());
+        this.broadcast.init();
+        if (this.managementURI != null && !this.managementURI.equals(this.uri)) {
+            this.management = TransportFactory.get(this.managementURI);
+            this.management.setName(getName() + "-Management");
+            this.management.setExceptionListener(this);
+            this.management.setPrev(getPrev());
+            this.management.setNext(getNext());
+            this.managementAddress = new InetSocketAddress(this.managementURI.getHost(), this.managementURI.getPort());
+            this.management.init();
         }
-        return result;
     }
 
     /**
-     * @return true if shutDown
+     * 
      * @throws Exception
      * @see org.apache.activeblaze.Service#shutDown()
      */
-    public boolean shutDown() throws Exception {
-        boolean result = super.shutDown();
+    public void doShutDown() throws Exception {
+        super.doShutDown();
         if (this.broadcast != null) {
             this.broadcast.shutDown();
         }
         if (this.management != null) {
             this.management.shutDown();
         }
-        return result;
     }
 
     /**
-     * @return true if started
+     * 
      * @throws Exception
      * @see org.apache.activeblaze.Service#start()
      */
-    public boolean start() throws Exception {
-        boolean result = super.start();
+    public void doStart() throws Exception {
+        super.doStart();
         if (this.broadcast != null) {
             this.broadcast.start();
         }
         if (this.management != null) {
             this.management.start();
         }
-        return result;
     }
 
     /**
-     * @return true if stopped
+     * 
      * @throws Exception
      * @see org.apache.activeblaze.Service#stop()
      */
-    public boolean stop() throws Exception {
-        boolean result = super.stop();
+    public void doStop() throws Exception {
+        super.doStop();
         if (this.broadcast != null) {
             this.broadcast.stop();
         }
         if (this.management != null) {
             this.management.stop();
         }
-        return result;
     }
 
     /**

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java Tue Jan 27 07:30:16 2009
@@ -104,76 +104,70 @@
     }
 
     /**
-     * @return true if initialized
+     * 
      * @throws Exception
      * @see org.apache.activeblaze.Service#init()
      */
-    public boolean init() throws Exception {
-        boolean result = super.init();
-        if (result) {
-            this.broadcast = createTransport(this.broadcastURIs);
-            this.broadcast.setName(getName() + "-Broadcast");
-            this.broadcast.setExceptionListener(this);
-            this.broadcast.setPrev(getPrev());
-            this.broadcast.setNext(getNext());
-            if (!this.managementURIs.isEmpty() && !this.managementURIs.equals(this.broadcastURIs)) {
-                this.management = createTransport(this.managementURIs);
-                this.management.setName(getName() + "-Management");
-                this.management.setExceptionListener(this);
-                this.management.setPrev(getPrev());
-                this.management.setNext(getNext());
-                this.management.init();
-            }
+    public void doInit() throws Exception {
+        super.doInit();
+        this.broadcast = createTransport(this.broadcastURIs);
+        this.broadcast.setName(getName() + "-Broadcast");
+        this.broadcast.setExceptionListener(this);
+        this.broadcast.setPrev(getPrev());
+        this.broadcast.setNext(getNext());
+        if (!this.managementURIs.isEmpty() && !this.managementURIs.equals(this.broadcastURIs)) {
+            this.management = createTransport(this.managementURIs);
+            this.management.setName(getName() + "-Management");
+            this.management.setExceptionListener(this);
+            this.management.setPrev(getPrev());
+            this.management.setNext(getNext());
+            this.management.init();
         }
-        return result;
     }
 
     /**
-     * @return true if shutDown
+     * 
      * @throws Exception
      * @see org.apache.activeblaze.Service#shutDown()
      */
-    public boolean shutDown() throws Exception {
-        boolean result = super.shutDown();
+    public void doShutDown() throws Exception {
+        super.doShutDown();
         if (this.broadcast != null) {
             this.broadcast.shutDown();
         }
         if (this.management != null) {
             this.management.shutDown();
         }
-        return result;
     }
 
     /**
-     * @return true if started
+     * 
      * @throws Exception
      * @see org.apache.activeblaze.Service#start()
      */
-    public boolean start() throws Exception {
-        boolean result = super.start();
+    public void doStart() throws Exception {
+        super.doStart();
         if (this.broadcast != null) {
             this.broadcast.start();
         }
         if (this.management != null) {
             this.management.start();
         }
-        return result;
     }
 
     /**
-     * @return true if stopped
+     * 
      * @throws Exception
      * @see org.apache.activeblaze.Service#stop()
      */
-    public boolean stop() throws Exception {
-        boolean result = super.stop();
+    public void doStop() throws Exception {
+        super.doStop();
         if (this.broadcast != null) {
             this.broadcast.stop();
         }
         if (this.management != null) {
             this.management.stop();
         }
-        return result;
     }
 
     /**

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java Tue Jan 27 07:30:16 2009
@@ -112,45 +112,38 @@
         this.prev = prev;
     }
 
-    public boolean init() throws Exception {
-        boolean result = super.init();
-        if (result && this.next != null) {
+    public void doInit() throws Exception {
+        if (this.next != null) {
             this.next.init();
         }
-        return result;
     }
 
-    public boolean shutDown() throws Exception {
-        boolean result = super.shutDown();
-        if (result && this.next != null) {
+    public void doShutDown() throws Exception {
+        if (this.next != null) {
             this.next.shutDown();
         }
-        return result;
     }
 
-    public boolean start() throws Exception {
-        boolean result = super.start();
-        if (result && this.next != null) {
+    public void doStart() throws Exception {
+        if (this.next != null) {
             this.next.start();
         }
-        return result;
     }
 
-    public boolean stop() throws Exception {
-        boolean result = super.stop();
-        if (result && this.next != null) {
+    public void doStop() throws Exception {
+        if (this.next != null) {
             this.next.stop();
         }
-        return result;
     }
-    
+
     /**
      * Send a management packet - this may be on a different address
+     * 
      * @param packet
      * @throws Exception
      */
-    public void downStreamManagement(Packet packet) throws Exception{
-        if (this.next!=null) {
+    public void downStreamManagement(Packet packet) throws Exception {
+        if (this.next != null) {
             this.next.downStreamManagement(packet);
         }
     }
@@ -208,9 +201,9 @@
      * @see org.apache.activeblaze.impl.processor.ChainedProcessor#getExceptionListener()
      */
     public ExceptionListener getExceptionListener() {
-       return this.exceptionListener;
+        return this.exceptionListener;
     }
-    
+
     /**
      * @return the maxPacketSize
      */
@@ -219,7 +212,8 @@
     }
 
     /**
-     * @param maxPacketSize the maxPacketSize to set
+     * @param maxPacketSize
+     *            the maxPacketSize to set
      */
     public void setMaxPacketSize(int maxPacketSize) {
         this.maxPacketSize = maxPacketSize;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java Tue Jan 27 07:30:16 2009
@@ -116,16 +116,29 @@
     }
 
     /**
+     * @return the message sequence
+     */
+    public long getMessageSequence() {
+        return this.packetData.getMessageSequence();
+    }
+
+    /**
      * @return the packetData
      */
     public PacketData getPacketData() {
         return this.packetData;
     }
 
+    
+    /**
+     * 
+     * @return a deep copy of <Code>this</Code>
+     * @see java.lang.Object#clone()
+     */
     public Packet clone() {
         Packet result = new Packet(this.id, this.packetData.clone());
-        result.to=this.to;
-        result.from=this.from;
+        result.to = this.to;
+        result.from = this.from;
         return result;
     }
 
@@ -137,6 +150,13 @@
     }
 
     /**
+     * @param address
+     */
+    public void setFrom(SocketAddress address) {
+        this.from = address;
+    }
+
+    /**
      * @return the to
      */
     public SocketAddress getTo() {
@@ -150,4 +170,27 @@
     public void setTo(SocketAddress to) {
         this.to = to;
     }
+
+    /**
+     * Is this Packet replayed
+     * 
+     * @return true if replayed
+     */
+    public boolean isReplayed() {
+        return this.packetData.getReplayed();
+    }
+
+    /**
+     * @return true if a response
+     */
+    public boolean isResponse() {
+        return this.packetData.getResponse();
+    }
+
+    /**
+     * @return true if response required
+     */
+    public boolean isResponseRequired() {
+        return this.packetData.getResponseRequired();
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java Tue Jan 27 07:30:16 2009
@@ -23,7 +23,6 @@
 import org.apache.activeblaze.wire.PacketData;
 import org.apache.activemq.protobuf.Buffer;
 
-
 /**
  * Checks for duplicates
  * 
@@ -33,28 +32,18 @@
     private int maxChannels = 256;
     private int maxAuditDepth = 1024;
 
-    public boolean shutDown() throws Exception {
-        boolean result = super.shutDown();
-        if (result) {
-            this.cache = null;
-        }
-        return result;
+    public void doShutDown() throws Exception {
+        this.cache = null;
     }
 
-    @SuppressWarnings("serial")
-    public boolean start() throws Exception {
-        boolean result = super.start();
-        if (result) {
-            if (this.cache == null) {
-                this.cache = new LinkedHashMap<Buffer, BitArrayBin>() {
-                    protected boolean removeEldestEntry(
-                            Map.Entry<Buffer, BitArrayBin> eldest) {
-                        return size() > getMaxChannels();
-                    }
-                };
-            }
+    public void doStart() throws Exception {
+        if (this.cache == null) {
+            this.cache = new LinkedHashMap<Buffer, BitArrayBin>() {
+                protected boolean removeEldestEntry(Map.Entry<Buffer, BitArrayBin> eldest) {
+                    return size() > getMaxChannels();
+                }
+            };
         }
-        return result;
     }
 
     /**
@@ -131,4 +120,22 @@
     public void setMaxAuditDepth(int maxAuditDepth) {
         this.maxAuditDepth = maxAuditDepth;
     }
+
+    /**
+     * @throws Exception
+     * @see org.apache.activeblaze.BaseService#doInit()
+     */
+    @Override
+    protected void doInit() throws Exception {
+        // TODO Auto-generated method stub
+    }
+
+    /**
+     * @throws Exception
+     * @see org.apache.activeblaze.BaseService#doStop()
+     */
+    @Override
+    protected void doStop() throws Exception {
+        // TODO Auto-generated method stub
+    }
 }

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReceivedPacket.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReceivedPacket.java?rev=738017&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReceivedPacket.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReceivedPacket.java Tue Jan 27 07:30:16 2009
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.reliable;
+
+import org.apache.activeblaze.impl.processor.Packet;
+
+/**
+ * @author rajdavies
+ *
+ */
+class ReceivedPacket extends ReliablePacket {
+
+    private final long timestamp;
+    /**
+     * Constructor
+     * @param packet 
+     */
+    ReceivedPacket(Packet packet) {
+        super(packet);
+        this.timestamp=System.currentTimeMillis();
+    }
+    /**
+     * @return the timestamp
+     */
+    public long getTimestamp() {
+        return this.timestamp;
+    }
+    
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReceivedPacket.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableBuffer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableBuffer.java?rev=738017&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableBuffer.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableBuffer.java Tue Jan 27 07:30:16 2009
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.reliable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.activeblaze.impl.processor.Packet;
+
+/**
+ * Holds a buffer of Packets to replay
+ * 
+ */
+public class ReliableBuffer {
+    Map<String, ReliablePacket> idMap = new HashMap<String, ReliablePacket>();
+    Map<Long, ReliablePacket> sequenceMap = new HashMap<Long, ReliablePacket>();
+    private int bufferSize = 0;
+    private ReliablePacket tail = null;
+    private ReliablePacket root = null;
+
+    /**
+     * @return the bufferSize
+     */
+    public synchronized int getBufferSize() {
+        return this.bufferSize;
+    }
+
+    /**
+     * @return the number of packets in the Buffer
+     */
+    public synchronized int size() {
+        return this.idMap.size();
+    }
+
+    /**
+     * @return true if empty
+     */
+    public synchronized boolean isEmpty() {
+        return this.idMap.isEmpty();
+    }
+
+    /**
+     * Add a Packet
+     * 
+     * @param p
+     */
+    public synchronized void addPacket(Packet p) {
+        ReliablePacket reliablePacket = new ReliablePacket(p);
+        if (this.idMap.put(p.getId(), reliablePacket) == null) {
+            this.sequenceMap.put(reliablePacket.getSequence(), reliablePacket);
+            this.bufferSize += p.getPacketData().serializedSizeFramed();
+            if (this.root == null) {
+                this.root = reliablePacket;
+            } else {
+                this.tail.linkAfter(reliablePacket);
+            }
+            this.tail = reliablePacket;
+        }
+    }
+
+    /**
+     * Add packet in order
+     * 
+     * @param p
+     */
+    public synchronized void addPacketInOrder(Packet p) {
+        ReliablePacket reliablePacket = new ReliablePacket(p);
+        if (this.idMap.put(p.getId(), reliablePacket) == null) {
+            this.sequenceMap.put(reliablePacket.getSequence(), reliablePacket);
+            this.bufferSize += p.getPacketData().serializedSizeFramed();
+            if (this.root == null) {
+                this.root = reliablePacket;
+                this.tail = reliablePacket;
+            } else {
+                if (this.root.getSequence().longValue() < reliablePacket.getSequence().longValue()) {
+                    this.root.linkBefore(reliablePacket);
+                    this.root = reliablePacket;
+                } else if (this.tail.getSequence().longValue() > reliablePacket.getSequence().longValue()) {
+                    ReliablePacket next = this.root;
+                    ReliablePacket last = next;
+                    while (next != null && reliablePacket.getSequence().longValue() > next.getSequence().longValue()) {
+                        last = next;
+                        next = (ReliablePacket) next.getNext();
+                    }
+                    last.linkAfter(reliablePacket);
+                    if (this.tail == last) {
+                        this.tail = reliablePacket;
+                    }
+                } else {
+                    this.tail.linkAfter(reliablePacket);
+                    this.tail = reliablePacket;
+                }
+            }
+        }
+    }
+
+    /**
+     * @return List of ordered Packets
+     */
+    public synchronized List<Packet> getOrderedBuffer() {
+        List<Packet> result = new ArrayList<Packet>();
+        if (size() > 1) {
+            long sequence = -1;
+            ReliablePacket next = this.root;
+            ReliablePacket last = this.root;
+            while (next != null) {
+                last = next;
+                next = (ReliablePacket) next.getNext();
+                if (sequence == -1) {
+                    sequence = last.getSequence().longValue();
+                    result.add(last.getPacket());
+                } else {
+                    if (next != null) {
+                        if (last.getSequence().longValue() + 1 == next.getSequence().longValue()) {
+                            result.add(last.getPacket());
+                        } else {
+                            result.clear();
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Get a Packet from the buffer
+     * 
+     * @param id
+     * @return the Packet
+     */
+    public synchronized Packet getPacket(String id) {
+        ReliablePacket reliablePacket = this.idMap.get(id);
+        return reliablePacket != null ? reliablePacket.getPacket() : null;
+    }
+
+    /**
+     * Get a Packet from the buffer
+     * 
+     * @param id
+     * @return the Packet
+     */
+    public synchronized Packet getPacket(long id) {
+        ReliablePacket reliablePacket = this.sequenceMap.get(new Long(id));
+        return reliablePacket != null ? reliablePacket.getPacket() : null;
+    }
+
+    /**
+     * Get the next Packet form the buffer
+     * 
+     * @param p
+     * @return the next Packet from the buffer
+     */
+    public synchronized Packet getNext(Packet p) {
+        ReliablePacket next = getNext(this.idMap.get(p.getId()));
+        return next != null ? next.getPacket() : null;
+    }
+
+    /**
+     * Get a list of Packetd from the buffer
+     * 
+     * @param start
+     * @param end
+     * @return the list of type <Code>Packet</Code>
+     */
+    public synchronized List<Packet> getPackets(long start, long end) {
+        List<Packet> result = new ArrayList<Packet>();
+        ReliablePacket reliablePacket = this.sequenceMap.get(new Long(start));
+        if (reliablePacket != null) {
+            result.add(reliablePacket.getPacket());
+            if (end >= start) {
+                while (reliablePacket != null) {
+                    reliablePacket = (ReliablePacket) reliablePacket.getNext();
+                    if (reliablePacket != null) {
+                        if (reliablePacket.getSequence().longValue() <= end) {
+                            result.add(reliablePacket.getPacket());
+                        } else {
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Remove a packet from the buffer
+     * 
+     * @param p
+     */
+    public synchronized void removePacket(Packet p) {
+        removePacket(p.getId());
+    }
+
+    /**
+     * Remove a packet from the buffer
+     * 
+     * @param id -
+     *            the id of the Packet
+     */
+    public synchronized void removePacket(String id) {
+        ReliablePacket reliablePacket = this.idMap.remove(id);
+        removeReliablePacket(reliablePacket);
+    }
+
+    /**
+     * Remove a Packet from the buffer
+     * 
+     * @param sequenceNumber
+     */
+    public synchronized void removePacket(long sequenceNumber) {
+        ReliablePacket reliablePacket = this.sequenceMap.remove(new Long(sequenceNumber));
+        removeReliablePacket(reliablePacket);
+    }
+
+    /**
+     * Remove Packets from the buffer
+     * 
+     * @param start
+     * @param end
+     */
+    public synchronized void removePackets(long start, long end) {
+        ReliablePacket packet = this.root;
+        while (packet != null) {
+            ReliablePacket remove = packet;
+            packet = getNext(packet);
+            long sequence = remove.getSequence().longValue();
+            if (start <= sequence && end >= sequence) {
+                removeReliablePacket(remove);
+            }
+            if (end < sequence) {
+                break;
+            }
+        }
+    }
+
+    /**
+     * clear the buffer
+     */
+    public synchronized void clear() {
+        this.idMap.clear();
+        this.sequenceMap.clear();
+        this.root = null;
+        this.tail = null;
+        this.bufferSize = 0;
+    }
+
+    private ReliablePacket getNext(ReliablePacket reliablePacket) {
+        ReliablePacket result = null;
+        if (reliablePacket != null) {
+            result = ((ReliablePacket) reliablePacket.getNext());
+        }
+        return result;
+    }
+
+    private void removeReliablePacket(ReliablePacket reliablePacket) {
+        if (reliablePacket != null) {
+            this.idMap.remove(reliablePacket.getId());
+            this.sequenceMap.remove(reliablePacket.getSequence());
+            if (reliablePacket == this.root) {
+                this.root = (ReliablePacket) reliablePacket.getNext();
+            }
+            if (reliablePacket == this.tail) {
+                this.tail = (ReliablePacket) reliablePacket.getPrevious();
+            }
+            reliablePacket.unlink();
+            this.bufferSize -= reliablePacket.getPacket().getPacketData().serializedSizeFramed();
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableBuffer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliablePacket.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliablePacket.java?rev=738017&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliablePacket.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliablePacket.java Tue Jan 27 07:30:16 2009
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.reliable;
+
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.util.LinkedNode;
+
+/**
+ * Wrapper for a Packet
+ * 
+ */
+class ReliablePacket extends LinkedNode {
+    private final Packet packet;
+    private final Long sequence;
+    private final String id;
+
+    ReliablePacket(Packet p) {
+        this.packet = p;
+        this.id = p.getId();
+        this.sequence = new Long(p.getMessageSequence());
+    }
+
+    Packet getPacket() {
+        return this.packet;
+    }
+
+    /**
+     * @return the sequence
+     */
+    public Long getSequence() {
+        return this.sequence;
+    }
+
+    /**
+     * @return the id
+     */
+    public String getId() {
+        return this.id;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliablePacket.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/package.html?rev=738017&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/package.html (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/package.html Tue Jan 27 07:30:16 2009
@@ -0,0 +1,25 @@
+!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+reliable protocols
+
+</body>
+</html>
\ No newline at end of file

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleFlow.java (from r734779, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleFlow.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleFlow.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java&r1=734779&r2=738017&rev=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/flow/SimpleFlow.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleFlow.java Tue Jan 27 07:30:16 2009
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activeblaze.impl.reliable.flow;
+package org.apache.activeblaze.impl.reliable.simple;
 
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.impl.processor.Packet;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java Tue Jan 27 07:30:16 2009
@@ -17,7 +17,6 @@
 package org.apache.activeblaze.impl.reliable.simple;
 
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
-import org.apache.activeblaze.impl.reliable.flow.SimpleFlow;
 
 /**
  * Very basic (none) reliability

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/package.html?rev=738017&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/package.html (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/package.html Tue Jan 27 07:30:16 2009
@@ -0,0 +1,25 @@
+!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+simple flow control
+
+</body>
+</html>
\ No newline at end of file



Mime
View raw message