activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r738017 [2/2] - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/cluster/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/network/ main/java/org/apache/a...
Date Tue, 27 Jan 2009 07:30:18 GMT
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java?rev=738017&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java Tue Jan 27 07:30:16 2009
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.reliable.swp;
+
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.reliable.ReliableBuffer;
+import org.apache.activeblaze.wire.AckData;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.NackData;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Consumer part of SWP
+ * 
+ */
+public class ConsumerProcessor {
+    static final Log LOG = LogFactory.getLog(ConsumerProcessor.class);
+    private static final long NOT_SET = -1l;
+    private int maxWindowSize = 16 * 1024;
+    private int rtt = 1000;
+    private long lastAckTime;
+    private long firstSequence = NOT_SET;
+    private long lastSequence = NOT_SET;
+    private int bufferSize;
+    private ReliableBuffer replayBuffer = new ReliableBuffer();
+    private final Lock lock = new ReentrantLock();
+    private final AtomicLong ackSequence = new AtomicLong();
+    private final SwpProcessor swp;
+    private final SocketAddress peerAddress;
+    private Packet lastAck = null;
+
+    ConsumerProcessor(SwpProcessor swp, SocketAddress peerAddress) {
+        this.swp = swp;
+        this.peerAddress = peerAddress;
+    }
+
+    void processInBound(Packet packet) throws Exception {
+        PacketData packetData = packet.getPacketData();
+        MessageType type = MessageType.valueOf(packetData.getType());
+        if (type == MessageType.CONTROL_DATA) {
+            if (this.replayBuffer.isEmpty()) {
+                // send back a control message
+            }
+            // our peer didn't receive our last ack
+            Packet resend = this.lastAck;
+            if (resend != null) {
+                this.swp.sendDownStream(resend);
+            }
+        } else {
+            long sequence = packet.getMessageSequence();
+            if (sequence == (this.lastSequence + 1) || this.lastSequence == NOT_SET) {
+                this.lock.lock();
+                try {
+                    if (this.lastSequence == NOT_SET) {
+                        this.firstSequence = sequence;
+                    }
+                    this.lastSequence = sequence;
+                    this.bufferSize += packet.getPacketData().serializedSizeFramed();
+                } finally {
+                    this.lock.unlock();
+                }
+                this.swp.sendUpStream(packet);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(this + " consumed packet " + packet.getMessageSequence() + " FROM: " + packet.getFrom());
+                }
+                if (this.bufferSize >= this.maxWindowSize) {
+                    sendAck();
+                }
+            } else {
+                LOG.debug(this + " Packet not in sequence " + packet.getMessageSequence() + " FROM: "
+                        + packet.getFrom() + " Last sequence is " + this.lastSequence);
+                // buffer this up and wait ...
+                this.replayBuffer.addPacketInOrder(packet);
+                List<Packet> ordered = null;
+                if (this.replayBuffer.size() > 1) {
+                    synchronized (this.replayBuffer) {
+                        ordered = this.replayBuffer.getOrderedBuffer();
+                        if (!ordered.isEmpty()) {
+                            this.replayBuffer.clear();
+                        }
+                    }
+                }
+                if (ordered != null && !ordered.isEmpty()) {
+                    for (Packet p : ordered) {
+                        this.swp.sendUpStream(p);
+                    }
+                } else if (!packet.isReplayed() && !this.replayBuffer.isEmpty()) {
+                    // request the sequence
+                    MessageType nackType = MessageType.NACK_DATA;
+                    NackData nack = (NackData) nackType.createMessage();
+                    this.lock.lock();
+                    try {
+                        nack.setStartSequence(this.lastSequence + 1);
+                        nack.setEndSequence(packet.getMessageSequence() - 1);
+                        nack.setSessionId(packet.getPacketData().getSessionId());
+                        nack.setId(this.ackSequence.incrementAndGet());
+                        PacketData pd = new PacketData();
+                        pd.setResponseRequired(false);
+                        pd.setPayload(nack.toFramedBuffer());
+                        pd.setType(nackType.getNumber());
+                        Packet nackPacket = new Packet(pd);
+                        nackPacket.setTo(this.peerAddress);
+                        this.swp.sendDownStream(nackPacket);
+                        LOG.debug(this + " Sending Nack: " + nack.getStartSequence() + " , " + nack.getEndSequence());
+                        this.lastAck = nackPacket;
+                    } finally {
+                        this.lock.unlock();
+                    }
+                    this.lastAckTime = System.currentTimeMillis();
+                }
+            }
+        }
+    }
+
+    /**
+     * 
+     * @param timeStamp
+     * @return if still valid
+     * @throws Exception
+     */
+    boolean control(long timeStamp) throws Exception {
+        boolean result = false;
+        if (this.lastAckTime + this.rtt < timeStamp || this.bufferSize >= this.maxWindowSize) {
+            sendAck();
+        }
+        return result;
+    }
+
+    void sendAck() throws Exception {
+        Packet ackPacket = null;
+        this.lock.lock();
+        try {
+            this.bufferSize = 0;
+            MessageType type = MessageType.ACK_DATA;
+            AckData ack = (AckData) type.createMessage();
+            ack.setStartSequence(this.firstSequence);
+            ack.setEndSequence(this.lastSequence);
+            ack.setId(this.ackSequence.incrementAndGet());
+            PacketData pd = new PacketData();
+            pd.setResponseRequired(false);
+            pd.setPayload(ack.toFramedBuffer());
+            pd.setType(type.getNumber());
+            ackPacket = new Packet(pd);
+            ackPacket.setTo(this.peerAddress);
+            this.lastAckTime = System.currentTimeMillis();
+            this.firstSequence = this.lastSequence;
+            LOG.debug(this + " Sent Ack " + ack);
+        } finally {
+            this.lock.unlock();
+        }
+        this.swp.sendDownStream(ackPacket);
+        this.lastAck = ackPacket;
+    }
+
+    public String toString() {
+        return "ConsumerProcessor(" + this.peerAddress + ")";
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java?rev=738017&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java Tue Jan 27 07:30:16 2009
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.reliable.swp;
+
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.activeblaze.BlazeNoRouteException;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.reliable.ReliableBuffer;
+import org.apache.activeblaze.wire.AckData;
+import org.apache.activeblaze.wire.ControlData;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.NackData;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * state on a request
+ * 
+ */
+public class ProducerProcessor {
+    static final Log LOG = LogFactory.getLog(ProducerProcessor.class);
+    private static final long NOT_SET = -1l;
+    private int maxWindowSize = 32 * 1024;
+    private int rtt = 5000;
+    private ReliableBuffer replayBuffer = new ReliableBuffer();
+    private final Lock lock = new ReentrantLock();
+    private final Condition full = this.lock.newCondition();
+    private final SwpProcessor swp;
+    private final SocketAddress peerAddress;
+    private final int sessionId;
+    private long lastAckId = NOT_SET;
+    private long lastAckTime = NOT_SET;
+    private AtomicLong sendSequence = new AtomicLong(1);
+
+    ProducerProcessor(SwpProcessor swp, SocketAddress peerAddress, int sessionId) {
+        this.swp = swp;
+        this.peerAddress = peerAddress;
+        this.sessionId = sessionId;
+    }
+
+    /**
+     * blocks until it can send the packet
+     * 
+     * @param packet
+     * @throws BlazeNoRouteException
+     * 
+     */
+    void processOutbound(final Packet packet) throws BlazeNoRouteException {
+        packet.getPacketData().setSessionId(this.sessionId);
+        packet.getPacketData().setMessageSequence(this.sendSequence.incrementAndGet());
+        this.lock.lock();
+        try {
+            this.replayBuffer.addPacket(packet);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(this + " Sending " + packet.getMessageSequence() + "  - buffer size = "
+                        + this.replayBuffer.size());
+            }
+            int windowSize = this.replayBuffer.getBufferSize();
+            if (windowSize >= this.maxWindowSize) {
+                if (!this.full.await(this.rtt, TimeUnit.MILLISECONDS)) {
+                    throw new BlazeNoRouteException("No route to " + packet.getTo());
+                }
+            }
+        } catch (InterruptedException e) {
+        } finally {
+            this.lock.unlock();
+        }
+    }
+
+    Packet processInbound(Packet packet) throws Exception {
+        Packet result = null;
+        PacketData data = packet.getPacketData();
+        if (data != null) {
+            MessageType type = MessageType.valueOf(data.getType());
+            if (type == MessageType.ACK_DATA) {
+                AckData ackData = (AckData) type.createMessage();
+                ackData.mergeFramed(data.getPayload());
+                long start = ackData.getStartSequence();
+                long end = ackData.getEndSequence();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(this + " Got Ack = " + ackData.getId() + ": " + ackData.getStartSequence() + ","
+                            + ackData.getEndSequence() + " [" + this.replayBuffer.size() + "]");
+                }
+                this.replayBuffer.removePackets(start, end);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(this + " Processed Ack = " + ackData.getId() + ": " + ackData.getStartSequence() + ","
+                            + ackData.getEndSequence() + " [" + this.replayBuffer.size() + "]");
+                }
+                this.lastAckId = ackData.getId();
+                this.lastAckTime = System.currentTimeMillis();
+                if (this.replayBuffer.getBufferSize() < this.maxWindowSize) {
+                    this.lock.lock();
+                    try {
+                        this.full.signalAll();
+                    } finally {
+                        this.lock.unlock();
+                    }
+                }
+            } else if (type == MessageType.NACK_DATA) {
+                this.lastAckTime = System.currentTimeMillis();
+                NackData nackData = (NackData) type.createMessage();
+                nackData.mergeFramed(data.getPayload());
+                this.lastAckId = nackData.getId();
+                LOG.debug(this + " Got Nack = " + nackData);
+                // lookup any missed messages
+                long start = nackData.getStartSequence();
+                long end = nackData.getEndSequence();
+                List<Packet> list = this.replayBuffer.getPackets(start, end);
+                LOG.debug(this + " Replaying " + list);
+                for (Packet p : list) {
+                    this.swp.sendDownStream(p);
+                }
+            } else {
+                result = packet;
+            }
+        }
+        return result;
+    }
+
+    /**
+     * 
+     * @param timeStamp
+     * @return true if still valid
+     * @throws Exception
+     */
+    boolean control(long timeStamp) throws Exception {
+        boolean result = false;
+        if ((this.lastAckTime + (this.rtt / 2)) < timeStamp) {
+            // send a control message
+            Packet ackPacket = null;
+            this.lock.lock();
+            try {
+                MessageType type = MessageType.CONTROL_DATA;
+                ControlData control = (ControlData) type.createMessage();
+                control.setLastId(this.lastAckId);
+                PacketData pd = new PacketData();
+                pd.setResponseRequired(false);
+                pd.setPayload(control.toFramedBuffer());
+                pd.setType(type.getNumber());
+                ackPacket = new Packet(pd);
+                ackPacket.setTo(this.peerAddress);
+                LOG.debug(this + " Sent Control message " + control);
+            } finally {
+                this.lock.unlock();
+            }
+            this.swp.sendDownStream(ackPacket);
+        } else if (this.lastAckTime + (this.rtt * 2) < timeStamp) {
+            // no longer valid
+            LOG.debug(this + " Not valid: Last AckTime " + this.lastAckTime + " , " + this.rtt + " , " + timeStamp);
+            result = true;
+        }
+        return result;
+    }
+
+    public String toString() {
+        return "ProducerProcessor(" + this.peerAddress + ")";
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java?rev=738017&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java Tue Jan 27 07:30:16 2009
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.reliable.swp;
+
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.util.LRUCache;
+import org.apache.activeblaze.util.SendRequest;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This is a sliding window protocol for unicast reliability
+ * 
+ */
+public class SwpProcessor extends DefaultChainedProcessor {
+    static final Log LOG = LogFactory.getLog(SwpProcessor.class);
+    private Map<Buffer, SendRequest> messageRequests;
+    private int maxConcurrentRequests = 1000;
+    private int maxWindowSize = 16 * 1024;
+    private int windowSize = 0;
+    private int rtt = 1000;
+    private Timer statusTimer;
+    protected AtomicInteger session = new AtomicInteger();
+    Map<SocketAddress, ProducerProcessor> producers = new ConcurrentHashMap<SocketAddress, ProducerProcessor>();
+    Map<SocketAddress, ConsumerProcessor> consumers = new ConcurrentHashMap<SocketAddress, ConsumerProcessor>();
+
+    public void downStream(final Packet p) throws Exception {
+        Packet packet = p.clone();
+        ProducerProcessor producer = getProducer(packet.getTo());
+        producer.processOutbound(packet);
+        sendDownStream(packet);
+    }
+
+    public void upStream(Packet packet) throws Exception {
+        ProducerProcessor producer = null;
+        synchronized (this.producers) {
+            producer = this.producers.get(packet.getFrom());
+        }
+        Packet packetToProcess = packet;
+        if (producer != null) {
+            packetToProcess = producer.processInbound(packet);
+        }
+        if (packetToProcess != null) {
+            ConsumerProcessor consumer = getConsumer(packet.getFrom());
+            consumer.processInBound(packet);
+        }
+    }
+
+    public void doInit() throws Exception {
+        super.doInit();
+        this.messageRequests = new LRUCache<Buffer, SendRequest>(this.maxConcurrentRequests);
+    }
+
+    public void doStart() throws Exception {
+        super.doStart();
+        TimerTask controlTask = new TimerTask() {
+            public void run() {
+                try {
+                    long timstamp = System.currentTimeMillis();
+                    for (Map.Entry<SocketAddress, ProducerProcessor> e : SwpProcessor.this.producers.entrySet()) {
+                        boolean result = e.getValue().control(timstamp);
+                        if (result) {
+                            LOG.info("Peer " + e.getValue() + "Not valid - removing");
+                            SwpProcessor.this.producers.remove(e.getKey());
+                            SwpProcessor.this.consumers.remove(e.getKey());
+                        }
+                    }
+                    for (Map.Entry<SocketAddress, ConsumerProcessor> e : SwpProcessor.this.consumers.entrySet()) {
+                        boolean result = e.getValue().control(timstamp);
+                        if (result) {
+                            LOG.info("Peer " + e.getValue() + "Not valid - removing");
+                            SwpProcessor.this.producers.remove(e.getKey());
+                            SwpProcessor.this.consumers.remove(e.getKey());
+                        }
+                    }
+                } catch (Exception e) {
+                    LOG.error("Failed to send heartbeat", e);
+                }
+            }
+        };
+        int interval = this.rtt / 4;
+        this.statusTimer = new Timer(true);
+        this.statusTimer.scheduleAtFixedRate(controlTask, interval, interval);
+    }
+
+    public void doStop() throws Exception {
+        super.doStop();
+        if (this.statusTimer != null) {
+            this.statusTimer.cancel();
+        }
+    }
+
+    void sendDownStream(Packet packet) throws Exception {
+        super.downStream(packet);
+    }
+
+    void sendUpStream(Packet packet) throws Exception {
+        super.upStream(packet);
+    }
+
+    /**
+     * @return the messageRequests
+     */
+    public Map<Buffer, SendRequest> getMessageRequests() {
+        return this.messageRequests;
+    }
+
+    /**
+     * @param messageRequests
+     *            the messageRequests to set
+     */
+    public void setMessageRequests(Map<Buffer, SendRequest> messageRequests) {
+        this.messageRequests = messageRequests;
+    }
+
+    /**
+     * @return the maxConcurrentRequests
+     */
+    public int getMaxConcurrentRequests() {
+        return this.maxConcurrentRequests;
+    }
+
+    /**
+     * @param maxConcurrentRequests
+     *            the maxConcurrentRequests to set
+     */
+    public void setMaxConcurrentRequests(int maxConcurrentRequests) {
+        this.maxConcurrentRequests = maxConcurrentRequests;
+    }
+
+    /**
+     * @return the maxWindowSize
+     */
+    public int getMaxWindowSize() {
+        return this.maxWindowSize;
+    }
+
+    /**
+     * @param maxWindowSize
+     *            the maxWindowSize to set
+     */
+    public void setMaxWindowSize(int maxWindowSize) {
+        this.maxWindowSize = maxWindowSize;
+    }
+
+    /**
+     * @return the windowSize
+     */
+    public int getWindowSize() {
+        return this.windowSize;
+    }
+
+    /**
+     * @param windowSize
+     *            the windowSize to set
+     */
+    public void setWindowSize(int windowSize) {
+        this.windowSize = windowSize;
+    }
+
+    private ProducerProcessor getProducer(SocketAddress peer) {
+        ProducerProcessor result = null;
+        synchronized (this.producers) {
+            result = this.producers.get(peer);
+            if (result == null) {
+                result = new ProducerProcessor(this, peer, this.session.incrementAndGet());
+                this.producers.put(peer, result);
+            }
+        }
+        return result;
+    }
+
+    private ConsumerProcessor getConsumer(SocketAddress peer) {
+        ConsumerProcessor result = this.consumers.get(peer);
+        if (result == null) {
+            result = new ConsumerProcessor(this, peer);
+            this.consumers.put(peer, result);
+        }
+        return result;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/package.html?rev=738017&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/package.html (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/package.html Tue Jan 27 07:30:16 2009
@@ -0,0 +1,25 @@
+!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Sliding Window Protocol for unicast
+
+</body>
+</html>
\ No newline at end of file

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java Tue Jan 27 07:30:16 2009
@@ -18,7 +18,6 @@
 
 import java.net.URI;
 import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.activeblaze.BlazeConfiguration;
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.processor.PacketAudit;
 import org.apache.activemq.protobuf.Buffer;
@@ -45,57 +44,45 @@
     private LinkedBlockingQueue<Packet> dispatchQueue;
     private Thread dispatchQueueThread;
 
-    public boolean init() throws Exception {
-        boolean result = super.init();
-        if (result) {
-            this.audit.init();
-            if (this.localURI != null) {
-                this.bufferOfLocalURI = new Buffer(this.localURI.toString());
-            }
-            this.dispatchQueue = new LinkedBlockingQueue<Packet>(getMaxDispatchQueueSize());
+    public void doInit() throws Exception {
+        super.doInit();
+        this.audit.init();
+        if (this.localURI != null) {
+            this.bufferOfLocalURI = new Buffer(this.localURI.toString());
         }
-        return result;
+        this.dispatchQueue = new LinkedBlockingQueue<Packet>(getMaxDispatchQueueSize());
     }
 
-    public boolean shutDown() throws Exception {
-        boolean result = super.shutDown();
-        if (result) {
-            this.audit.shutDown();
-        }
-        return result;
+    public void doShutDown() throws Exception {
+        super.doShutDown();
+        this.audit.shutDown();
     }
 
-    public boolean start() throws Exception {
-        boolean result = super.start();
-        if (result) {
-            this.audit.start();
-            Runnable runable = new Runnable() {
-                public void run() {
-                    while (isStarted()) {
-                        dequeuePackets();
-                    }
+    public void doStart() throws Exception {
+        super.doStart();
+        this.audit.start();
+        Runnable runable = new Runnable() {
+            public void run() {
+                while (isStarted()) {
+                    dequeuePackets();
                 }
-            };
-            this.dispatchQueueThread = new Thread(runable, getLocalURI() + "-DispatchQueue");
-            this.dispatchQueueThread.setDaemon(true);
-            this.dispatchQueueThread.start();
-        }
-        return result;
+            }
+        };
+        this.dispatchQueueThread = new Thread(runable, getLocalURI() + "-DispatchQueue");
+        this.dispatchQueueThread.setDaemon(true);
+        this.dispatchQueueThread.start();
     }
 
-    public boolean stop() throws Exception {
-        boolean result = super.stop();
-        if (result) {
-            this.audit.stop();
-            if (this.dispatchQueueThread != null) {
-                this.dispatchQueueThread.interrupt();
-                try {
-                    this.dispatchQueueThread.join(100);
-                } catch (InterruptedException e) {
-                }
+    public void doStop() throws Exception {
+        super.doStop();
+        this.audit.stop();
+        if (this.dispatchQueueThread != null) {
+            this.dispatchQueueThread.interrupt();
+            try {
+                this.dispatchQueueThread.join(100);
+            } catch (InterruptedException e) {
             }
         }
-        return result;
     }
 
     /**
@@ -115,6 +102,7 @@
             this.bufferOfLocalURI = new Buffer(this.localURI.toString());
         }
     }
+
     /**
      * @return the bufferSize
      */
@@ -237,7 +225,7 @@
     public void setMaxDispatchQueueSize(int maxDispatchQueueSize) {
         this.maxDispatchQueueSize = maxDispatchQueueSize;
     }
-    
+
     public void upStream(Packet packet) throws Exception {
         if (!isStopped()) {
             this.dispatchQueue.put(packet);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java Tue Jan 27 07:30:16 2009
@@ -35,41 +35,35 @@
     private String networkInterface;
     private InetSocketAddress socketAddress;
 
-    public boolean init() throws Exception {
-        boolean result = super.init();
-        if (result) {
-            this.socket = new MulticastSocket(getLocalURI().getPort());
-            this.socket.setTimeToLive(getTimeToLive());
-            this.socket.setLoopbackMode(isLoopBack());
-            this.socket.setSoTimeout(getSoTimeout());
-            this.socket.setReceiveBufferSize(getBufferSize());
-            this.socket.setSendBufferSize(getBufferSize());
-            this.socketAddress = new InetSocketAddress(InetAddress.getByName(getLocalURI().getHost()), getLocalURI()
-                    .getPort());
-            NetworkInterface ni = null;
-            if (getNetworkInterface() != null && getNetworkInterface().length() > 0) {
-                ni = NetworkInterface.getByName(getNetworkInterface());
-                if (ni == null) {
-                    throw new BlazeException("Couldn't find an network interface named " + getNetworkInterface());
-                }
-            }
-            if (ni != null) {
-                this.socket.joinGroup(this.socketAddress, ni);
-            } else {
-                this.socket.joinGroup(this.socketAddress.getAddress());
+    public void doInit() throws Exception {
+        super.doInit();
+        this.socket = new MulticastSocket(getLocalURI().getPort());
+        this.socket.setTimeToLive(getTimeToLive());
+        this.socket.setLoopbackMode(isLoopBack());
+        this.socket.setSoTimeout(getSoTimeout());
+        this.socket.setReceiveBufferSize(getBufferSize());
+        this.socket.setSendBufferSize(getBufferSize());
+        this.socketAddress = new InetSocketAddress(InetAddress.getByName(getLocalURI().getHost()), getLocalURI()
+                .getPort());
+        NetworkInterface ni = null;
+        if (getNetworkInterface() != null && getNetworkInterface().length() > 0) {
+            ni = NetworkInterface.getByName(getNetworkInterface());
+            if (ni == null) {
+                throw new BlazeException("Couldn't find an network interface named " + getNetworkInterface());
             }
         }
-        return result;
+        if (ni != null) {
+            this.socket.joinGroup(this.socketAddress, ni);
+        } else {
+            this.socket.joinGroup(this.socketAddress.getAddress());
+        }
     }
 
-    public boolean shutDown() throws Exception {
-        boolean result = super.shutDown();
-        if (result) {
-            if (this.socket != null) {
-                this.socket.close();
-            }
+    public void doShutDown() throws Exception {
+        super.doShutDown();
+        if (this.socket != null) {
+            this.socket.close();
         }
-        return result;
     }
 
     protected void doProcess() throws Exception {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java Tue Jan 27 07:30:16 2009
@@ -23,51 +23,42 @@
 
 /**
  * Thread associated with processing
- *
+ * 
  */
 public abstract class ThreadChainedProcessor extends DefaultChainedProcessor implements Runnable {
     private static final Log LOG = LogFactory.getLog(ThreadChainedProcessor.class);
-    private int priority=Thread.NORM_PRIORITY;
+    private int priority = Thread.NORM_PRIORITY;
     private boolean daemon;
     private String name;
     private Thread thread;
-  
-    
-    
-    public boolean start() throws Exception {
-        boolean result = super.start();
-        if(result) {
-            String name = getName() != null ? getName() : toString();
-            this.thread = new Thread(this,name);
-            this.thread.setDaemon(isDaemon());
-            this.thread.setPriority(getPriority());
-            this.thread.start();
-        }
-        return result;
+
+    public void doStart() throws Exception {
+        super.doStart();
+        String name = getName() != null ? getName() : toString();
+        this.thread = new Thread(this, name);
+        this.thread.setDaemon(isDaemon());
+        this.thread.setPriority(getPriority());
+        this.thread.start();
     }
-    
-    public boolean stop() throws Exception {
-        boolean result = super.stop();
-        if(result) {
-           if (this.thread != null) {
-               try {
-                   this.thread.interrupt();
+
+    public void doStop() throws Exception {
+        super.doStop();
+        if (this.thread != null) {
+            try {
+                this.thread.interrupt();
                 this.thread.join(250);
             } catch (InterruptedException e) {
             }
-           }
         }
-        return result;
     }
-    
+
     public void run() {
         while (isStarted()) {
             try {
                 doProcess();
-            } catch(SocketTimeoutException ste) {
-                //normal - ignore
-            }
-            catch (Exception e) {
+            } catch (SocketTimeoutException ste) {
+                // normal - ignore
+            } catch (Exception e) {
                 try {
                     boolean stopped = isStopped();
                     super.stop();
@@ -80,9 +71,10 @@
             }
         }
     }
-    
+
     /**
      * Process input for the Processor
+     * 
      * @throws Exception
      */
     protected abstract void doProcess() throws Exception;
@@ -93,32 +85,40 @@
     public int getPriority() {
         return this.priority;
     }
+
     /**
-     * @param priority the priority to set
+     * @param priority
+     *            the priority to set
      */
     public void setPriority(int priority) {
         this.priority = priority;
     }
+
     /**
      * @return the daemon
      */
     public boolean isDaemon() {
         return this.daemon;
     }
+
     /**
-     * @param daemon the daemon to set
+     * @param daemon
+     *            the daemon to set
      */
     public void setDaemon(boolean daemon) {
         this.daemon = daemon;
     }
+
     /**
      * @return the name
      */
     public String getName() {
         return this.name;
     }
+
     /**
-     * @param name the name to set
+     * @param name
+     *            the name to set
      */
     public void setName(String name) {
         this.name = name;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Tue Jan 27 07:30:16 2009
@@ -46,46 +46,40 @@
     private ByteBuffer outBuffer;
     private Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(1000);
 
-    public boolean init() throws Exception {
-        boolean result = super.init();
-        if (result) {
-            this.channel = DatagramChannel.open();
-            DatagramSocket socket = this.channel.socket();
-            SocketAddress address = null;
-            if (getLocalURI() != null) {
-                address = new InetSocketAddress(getLocalURI().getHost(), getLocalURI().getPort());
-            } else {
-                throw new BlazeException("localURI not set");
-            }
-            socket.setBroadcast(isBroadcast());
-            socket.setReceiveBufferSize(getBufferSize());
-            socket.setSendBufferSize(getBufferSize());
-            socket.setSoTimeout(getSoTimeout());
-            this.channel.configureBlocking(true);
-            socket.bind(address);
-            // if the port was 0 - the port will be allocated automatically -
-            // so need to reset the local uri
-            URI oldURI = getLocalURI();
-            URI newURI = new URI(oldURI.getScheme(), oldURI.getUserInfo(), oldURI.getHost(), socket.getLocalPort(),
-                    oldURI.getPath(), oldURI.getQuery(), oldURI.getFragment());
-            setLocalURI(newURI);
-            this.inBuffer = ByteBuffer.allocateDirect(getMaxPacketSize());
-            this.outBuffer = ByteBuffer.allocateDirect(getMaxPacketSize());
+    public void doInit() throws Exception {
+        super.doInit();
+        this.channel = DatagramChannel.open();
+        DatagramSocket socket = this.channel.socket();
+        SocketAddress address = null;
+        if (getLocalURI() != null) {
+            address = new InetSocketAddress(getLocalURI().getHost(), getLocalURI().getPort());
+        } else {
+            throw new BlazeException("localURI not set");
         }
-        return result;
+        socket.setBroadcast(isBroadcast());
+        socket.setReceiveBufferSize(getBufferSize());
+        socket.setSendBufferSize(getBufferSize());
+        socket.setSoTimeout(getSoTimeout());
+        this.channel.configureBlocking(true);
+        socket.bind(address);
+        // if the port was 0 - the port will be allocated automatically -
+        // so need to reset the local uri
+        URI oldURI = getLocalURI();
+        URI newURI = new URI(oldURI.getScheme(), oldURI.getUserInfo(), oldURI.getHost(), socket.getLocalPort(), oldURI
+                .getPath(), oldURI.getQuery(), oldURI.getFragment());
+        setLocalURI(newURI);
+        this.inBuffer = ByteBuffer.allocateDirect(getMaxPacketSize());
+        this.outBuffer = ByteBuffer.allocateDirect(getMaxPacketSize());
     }
 
-    public boolean shutDown() throws Exception {
-        boolean result = super.shutDown();
-        if (result) {
-            if (this.channel != null) {
-                this.channel.close();
-                this.inBuffer = null;
-                this.outBuffer = null;
-                this.channel = null;
-            }
+    public void doShutDown() throws Exception {
+        super.doShutDown();
+        if (this.channel != null) {
+            this.channel.close();
+            this.inBuffer = null;
+            this.outBuffer = null;
+            this.channel = null;
         }
-        return result;
     }
 
     protected void doProcess() throws Exception {
@@ -105,20 +99,15 @@
                             request.put(data.getMessageId(), data);
                         }
                     }
-                } else {
-                    if (data.getResponseRequired()) {
-                        PacketData pd = new PacketData();
-                        pd.setResponseRequired(false);
-                        pd.setCorrelationId(data.getMessageId());
-                        pd.setResponse(true);
-                        Packet packet = new Packet(pd);
-                        packet.setTo(address);
-                        downStream(packet);
-                    }
-                    Packet packet = new Packet(address, data);
-                    if (!isEnableAudit() || !this.audit.isDuplicate(packet)) {
-                        upStream(packet);
-                    }
+                }
+                if (data.getResponseRequired()) {
+                    Packet packet = createAckPacket(data);
+                    packet.setTo(address);
+                    downStream(packet);
+                }
+                Packet packet = new Packet(address, data);
+                if (!isEnableAudit() || !this.audit.isDuplicate(packet)) {
+                    upStream(packet);
                 }
             }
             buffer.clear();
@@ -129,7 +118,7 @@
         ByteBuffer buffer = this.outBuffer;
         if (isStarted()) {
             SendRequest request = null;
-            if (packet.getPacketData().getResponseRequired()) {
+            if (packet.isResponseRequired()) {
                 synchronized (this.messageRequests) {
                     request = new SendRequest();
                     this.messageRequests.put(packet.getPacketData().getMessageId(), request);
@@ -153,7 +142,23 @@
                 }
             }
         } else {
-            throw new BlazeException(this + " " + outBuffer + " Not started - trying to downStream " + packet);
+            throw new BlazeException(this + " Not started - cannot send " + packet);
         }
     }
+
+    private Packet createAckPacket(PacketData data) {
+        MessageType type = MessageType.ACK_DATA;
+        AckData ackData = (AckData) type.createMessage();
+        ackData.setSessionId(data.getSessionId());
+        ackData.setStartSequence(data.getMessageSequence());
+        ackData.setEndSequence(data.getMessageSequence());
+        PacketData pd = new PacketData();
+        pd.setResponseRequired(false);
+        pd.setCorrelationId(data.getMessageId());
+        pd.setResponse(true);
+        pd.setPayload(ackData.toFramedBuffer());
+        pd.setType(type.getNumber());
+        Packet packet = new Packet(pd);
+        return packet;
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IdGenerator.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IdGenerator.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IdGenerator.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IdGenerator.java Tue Jan 27 07:30:16 2009
@@ -99,7 +99,9 @@
      */
 
     public synchronized String generateId() {
-        return this.seed + (this.sequence++);
+        StringBuilder builder = new StringBuilder(this.seed.length() + 32);
+        builder.append(this.seed).append(":").append(this.sequence++);
+        return builder.toString();
     }
 
     /**

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LinkedNode.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LinkedNode.java?rev=738017&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LinkedNode.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LinkedNode.java Tue Jan 27 07:30:16 2009
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.util;
+
+/**
+ * Provides a base class for you to extend when you want object to maintain a
+ * doubly linked list to other objects without using a collection class.
+ * 
+ * @author chirino
+ */
+public class LinkedNode {
+
+    protected LinkedNode next = this;
+    protected LinkedNode prev = this;
+    protected boolean tail = true;
+
+    public LinkedNode getHeadNode() {
+        if (isHeadNode()) {
+            return this;
+        }
+        if (isTailNode()) {
+            return next;
+        }
+        LinkedNode rc = prev;
+        while (!rc.isHeadNode()) {
+            rc = rc.prev;
+        }
+        return rc;
+    }
+
+    public LinkedNode getTailNode() {
+        if (isTailNode()) {
+            return this;
+        }
+        if (isHeadNode()) {
+            return prev;
+        }
+        LinkedNode rc = next;
+        while (!rc.isTailNode()) {
+            rc = rc.next;
+        }
+        return rc;
+    }
+
+    public LinkedNode getNext() {
+        return tail ? null : next;
+    }
+
+    public LinkedNode getPrevious() {
+        return prev.tail ? null : prev;
+    }
+
+    public boolean isHeadNode() {
+        return prev.isTailNode();
+    }
+
+    public boolean isTailNode() {
+        return tail;
+    }
+
+    /**
+     * @param rightHead the node to link after this node.
+     * @return this
+     */
+    public LinkedNode linkAfter(LinkedNode rightHead) {
+
+        if (rightHead == this) {
+            throw new IllegalArgumentException("You cannot link to yourself");
+        }
+        if (!rightHead.isHeadNode()) {
+            throw new IllegalArgumentException("You only insert nodes that are the first in a list");
+        }
+
+        LinkedNode rightTail = rightHead.prev;
+
+        if (tail) {
+            tail = false;
+        } else {
+            rightTail.tail = false;
+        }
+
+        rightHead.prev = this; // link the head of the right side.
+        rightTail.next = next; // link the tail of the right side
+        next.prev = rightTail; // link the head of the left side
+        next = rightHead; // link the tail of the left side.
+
+        return this;
+    }
+
+    /**
+     * @param leftHead the node to link after this node.
+     * @return
+     * @return this
+     */
+    public LinkedNode linkBefore(LinkedNode leftHead) {
+
+        if (leftHead == this) {
+            throw new IllegalArgumentException("You cannot link to yourself");
+        }
+        if (!leftHead.isHeadNode()) {
+            throw new IllegalArgumentException("You only insert nodes that are the first in a list");
+        }
+
+        // The left side is no longer going to be a tail..
+        LinkedNode leftTail = leftHead.prev;
+        leftTail.tail = false;
+
+        leftTail.next = this; // link the tail of the left side.
+        leftHead.prev = prev; // link the head of the left side.
+        prev.next = leftHead; // link the tail of the right side.
+        prev = leftTail; // link the head of the right side.
+
+        return leftHead;
+    }
+
+    /**
+     * Removes this node out of the linked list it is chained in.
+     */
+    public void unlink() {
+        // If we are allready unlinked...
+        if (prev == this) {
+            reset();
+            return;
+        }
+
+        if (tail) {
+            prev.tail = true;
+        }
+
+        // Update the peers links..
+        next.prev = prev;
+        prev.next = next;
+
+        // Update our links..
+        reset();
+    }
+    
+    public void reset() {
+        next = this;
+        prev = this;
+        tail = true;
+    }
+
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LinkedNode.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original)
+++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Tue Jan 27 07:30:16 2009
@@ -31,31 +31,51 @@
   MEMBER_DATA = 1;
   ELECTION_MESSAGE = 2;
   ACK_DATA = 3;
-  STATE_DATA = 4;
+  NACK_DATA = 4;
+  STATE_DATA = 5;
+  CONTROL_DATA =6;
 }
     message PacketData {   
       optional bool responseRequired = 1;
       optional bool reliable = 2;
       optional bool response = 3;
-      optional int32 type =4;  
-	    optional bytes producerId = 5;
-	    optional int64 sessionId = 6;
-      optional int64 messageSequence = 7;  
-      optional int32 numberOfParts= 8;
-      optional int32 partNumber= 9;
-      optional bytes payload= 10;
-      optional bytes messageId =11;
-      optional bytes correlationId = 12;
+      optional bool replayed = 4;
+      optional int32 type =5;  
+	    optional bytes producerId = 6;
+	    optional int32 sessionId = 7;
+      optional int64 messageSequence = 8;  
+      optional int32 numberOfParts= 9;
+      optional int32 partNumber= 10;
+      optional bytes payload= 11;
+      optional bytes messageId =12;
+      optional bytes correlationId = 13;
 	  
     }
     
     message AckData {
      //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
        //| option java_type_method = "MessageType";
+       required int64 id =1;
        optional int64 startSequence =2;
        optional int64 endSequence =3;
-       optional bytes fromAddress =4;
-       optional int64 sessionId = 5;
+       optional int64 sessionId = 4;
+    }
+    
+    
+    
+    message NackData {
+     //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
+       //| option java_type_method = "MessageType";
+       required int64 id =1;
+       optional int64 startSequence =2;
+       optional int64 endSequence =3;
+       optional int64 sessionId = 4;
+    }
+    
+    message ControlData {
+     //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
+       //| option java_type_method = "MessageType";
+       required int64 lastId =1; //last ack or nack id
     }
     
     message DestinationData {

Added: activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/reliable/swp
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/reliable/swp?rev=738017&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/reliable/swp (added)
+++ activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/reliable/swp Tue Jan 27 07:30:16 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activeblaze.impl.reliable.swp.SwpProcessor

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java Tue Jan 27 07:30:16 2009
@@ -50,7 +50,7 @@
         msg.setText("value");
         for (int i = 0; i < count; i++) {
             sender.broadcast(destination, msg);
-            Thread.sleep(100);
+            //Thread.sleep(100);
         }
         latch.await(10, TimeUnit.SECONDS);
         receiver.stop();

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java?rev=738017&r1=738016&r2=738017&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java Tue Jan 27 07:30:16 2009
@@ -28,8 +28,8 @@
     public void testStart() throws Exception {
         final AtomicBoolean test = new AtomicBoolean();
         DefaultChainedProcessor target = new DefaultChainedProcessor() {
-            public boolean start() {
-                return test.getAndSet(true);
+            public void doStart() {
+                test.getAndSet(true);
             }
         };
         DefaultChainedProcessor A = new DefaultChainedProcessor();
@@ -43,8 +43,8 @@
     public void testStop() throws Exception {
         final AtomicBoolean test = new AtomicBoolean();
         DefaultChainedProcessor target = new DefaultChainedProcessor() {
-            public boolean stop() {
-                return test.getAndSet(true);
+            public void doStop() {
+                test.getAndSet(true);
             }
         };
         DefaultChainedProcessor A = new DefaultChainedProcessor();

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkProcessor.java?rev=738017&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkProcessor.java Tue Jan 27 07:30:16 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.reliable.swp;
+
+import java.net.SocketAddress;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
+import org.apache.activeblaze.impl.processor.Packet;
+
+/**
+ * Changes downStream() to upStream and vice versa
+ * 
+ */
+public class MockNetworkProcessor extends DefaultChainedProcessor {
+    int count = 0;
+    int loss = 3;
+    final MockNetworkSwitch networkSwitch;
+    final SocketAddress address;
+
+    MockNetworkProcessor(MockNetworkSwitch networkSwitch, SocketAddress address) {
+        this.networkSwitch = networkSwitch;
+        this.address = address;
+    }
+
+    public void downStream(Packet packet) throws Exception {
+        this.count++;
+        if (this.loss <= 0 || this.count % this.loss != 0) {
+            this.networkSwitch.deliver(this, packet);
+        } else {
+            System.err.println("LOST " + packet);
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkSwitch.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkSwitch.java?rev=738017&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkSwitch.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkSwitch.java Tue Jan 27 07:30:16 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.reliable.swp;
+
+import java.net.InetSocketAddress;
+import org.apache.activeblaze.impl.processor.Packet;
+
+/**
+ * Mock network
+ * 
+ */
+public class MockNetworkSwitch {
+    private final MockNetworkProcessor peer1;
+    private final MockNetworkProcessor peer2;
+
+    MockNetworkSwitch() {
+        this.peer1 = new MockNetworkProcessor(this, new InetSocketAddress("localhost",1));
+        this.peer2 = new MockNetworkProcessor(this,new InetSocketAddress("localhost",2));
+    }
+
+    MockNetworkProcessor getPeer1() {
+        return this.peer1;
+    }
+
+    MockNetworkProcessor getPeer2() {
+        return this.peer2;
+    }
+
+    void deliver(MockNetworkProcessor peer, Packet p) throws Exception {
+        MockNetworkProcessor target = peer == this.peer1 ? this.peer2 : this.peer1;
+        p.setTo(target.address);
+        p.setFrom(target.address);
+        target.upStream(p);
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/MockNetworkSwitch.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java?rev=738017&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java Tue Jan 27 07:30:16 2009
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.reliable.swp;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import junit.framework.TestCase;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.transport.UdpTransport;
+import org.apache.activeblaze.util.IdGenerator;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * Test the SwpProcessor
+ * 
+ */
+public class SwpProcessorTest extends TestCase {
+    private IdGenerator idGenerator = new IdGenerator();
+    private URI receiverURI;
+    private URI senderURI;
+    SocketAddress to;
+    SwpProcessor producer;
+    SwpProcessor consumer;
+
+    /**
+     * @throws java.lang.Exception
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        super.setUp();
+        this.receiverURI = new URI("udp://localhost:6966");
+        this.senderURI = new URI("udp://localhost:6766");
+        this.to = new InetSocketAddress(receiverURI.getHost(), receiverURI.getPort());
+        this.producer = new SwpProcessor();
+        this.consumer = new SwpProcessor();
+    }
+
+    /**
+     * @throws java.lang.Exception
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (this.producer != null) {
+            this.producer.shutDown();
+        }
+        if (this.consumer != null) {
+            this.consumer.shutDown();
+        }
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void XtestProcessor() throws Exception {
+        final List<Packet> results = new ArrayList<Packet>();
+        final int number = 50000;
+        final CountDownLatch latch = new CountDownLatch(number);
+        DefaultChainedProcessor test = new DefaultChainedProcessor() {
+            public void upStream(Packet p) {
+                results.add(p);
+                latch.countDown();
+            }
+        };
+        UdpTransport sender = new UdpTransport();
+        sender.setLocalURI(this.senderURI);
+        this.producer.setNextChain(sender);
+        this.producer.start();
+        this.consumer.setPrev(test);
+        UdpTransport receiver = new UdpTransport();
+        this.consumer.setNextChain(receiver);
+        receiver.setLocalURI(this.receiverURI);
+        this.consumer.start();
+        for (int i = 0; i < number; i++) {
+            Packet packet = createPacket(this.to);
+            this.producer.downStream(packet);
+        }
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(number, results.size());
+        long seq = -1;
+        for (Packet p : results) {
+            if (seq == -1) {
+                seq = p.getMessageSequence();
+            } else {
+                assertEquals(seq, p.getMessageSequence());
+            }
+            seq++;
+        }
+    }
+    
+    public void testResponseRequiredProcessor() throws Exception {
+        final List<Packet> results = new ArrayList<Packet>();
+        final int number = 100;
+        final CountDownLatch latch = new CountDownLatch(number);
+        DefaultChainedProcessor test = new DefaultChainedProcessor() {
+            public void upStream(Packet p) {
+                results.add(p);
+                latch.countDown();
+            }
+        };
+        UdpTransport sender = new UdpTransport();
+        sender.setLocalURI(this.senderURI);
+        this.producer.setNextChain(sender);
+        this.producer.start();
+        this.consumer.setPrev(test);
+        UdpTransport receiver = new UdpTransport();
+        this.consumer.setNextChain(receiver);
+        receiver.setLocalURI(this.receiverURI);
+        this.consumer.start();
+        for (int i = 0; i < number; i++) {
+            Packet packet = createPacket(this.to);
+            packet.getPacketData().setResponseRequired(true);
+            this.producer.downStream(packet);
+        }
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(number, results.size());
+        long seq = -1;
+        for (Packet p : results) {
+            if (seq == -1) {
+                seq = p.getMessageSequence();
+            } else {
+                assertEquals(seq, p.getMessageSequence());
+            }
+            seq++;
+        }
+    }
+
+    protected Packet createPacket(SocketAddress to) throws Exception {
+        PacketData data = new PacketData();
+        data.setMessageId(new Buffer(this.idGenerator.generateId()));
+        Buffer payload = new Buffer(new byte[1024]);
+        data.setPayload(payload);
+        Packet packet = new Packet(data);
+        packet.setTo(to);
+        return packet;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message