activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r1504961 [5/11] - in /activemq/activemq-blaze/trunk: ./ src/main/java/org/apache/activeblaze/ src/main/java/org/apache/activeblaze/cluster/ src/main/java/org/apache/activeblaze/group/ src/main/java/org/apache/activeblaze/impl/destination/ s...
Date Fri, 19 Jul 2013 18:44:24 GMT
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReceivedPacket.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReceivedPacket.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReceivedPacket.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReceivedPacket.java Fri Jul 19 18:44:21 2013
@@ -16,28 +16,28 @@
  */
 package org.apache.activeblaze.impl.reliable;
 
-import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.wire.Packet;
 
 /**
  * @author rajdavies
- *
  */
 class ReceivedPacket extends ReliablePacket {
 
     private final long timestamp;
+
     /**
      * Constructor
-     * @param packet 
      */
     ReceivedPacket(Packet packet) {
         super(packet);
-        this.timestamp=System.currentTimeMillis();
+        this.timestamp = System.currentTimeMillis();
     }
+
     /**
      * @return the timestamp
      */
     public long getTimestamp() {
         return this.timestamp;
     }
-    
+
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableBuffer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableBuffer.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableBuffer.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableBuffer.java Fri Jul 19 18:44:21 2013
@@ -20,11 +20,11 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.activeblaze.impl.processor.Packet;
+
+import org.apache.activeblaze.wire.Packet;
 
 /**
  * Holds a buffer of Packets to replay
- * 
  */
 public class ReliableBuffer {
     Map<String, ReliablePacket> idMap = new HashMap<String, ReliablePacket>();
@@ -56,14 +56,12 @@ public class ReliableBuffer {
 
     /**
      * Add a Packet
-     * 
-     * @param p
      */
     public synchronized void addPacket(Packet p) {
         ReliablePacket reliablePacket = new ReliablePacket(p);
         if (this.idMap.put(p.getId(), reliablePacket) == null) {
             this.sequenceMap.put(reliablePacket.getSequence(), reliablePacket);
-            this.bufferSize += p.getPacketData().serializedSizeFramed();
+            this.bufferSize++;
             if (this.root == null) {
                 this.root = reliablePacket;
             } else {
@@ -75,14 +73,12 @@ public class ReliableBuffer {
 
     /**
      * Add packet in order
-     * 
-     * @param p
      */
     public synchronized void addPacketInOrder(Packet p) {
         ReliablePacket reliablePacket = new ReliablePacket(p);
         if (this.idMap.put(p.getId(), reliablePacket) == null) {
             this.sequenceMap.put(reliablePacket.getSequence(), reliablePacket);
-            this.bufferSize += p.getPacketData().serializedSizeFramed();
+            this.bufferSize++;
             if (this.root == null) {
                 this.root = reliablePacket;
                 this.tail = reliablePacket;
@@ -141,8 +137,7 @@ public class ReliableBuffer {
 
     /**
      * Get a Packet from the buffer
-     * 
-     * @param id
+     *
      * @return the Packet
      */
     public synchronized Packet getPacket(String id) {
@@ -152,8 +147,7 @@ public class ReliableBuffer {
 
     /**
      * Get a Packet from the buffer
-     * 
-     * @param id
+     *
      * @return the Packet
      */
     public synchronized Packet getPacket(long id) {
@@ -163,8 +157,7 @@ public class ReliableBuffer {
 
     /**
      * Get the next Packet form the buffer
-     * 
-     * @param p
+     *
      * @return the next Packet from the buffer
      */
     public synchronized Packet getNext(Packet p) {
@@ -174,9 +167,7 @@ public class ReliableBuffer {
 
     /**
      * Get a list of Packetd from the buffer
-     * 
-     * @param start
-     * @param end
+     *
      * @return the list of type <Code>Packet</Code>
      */
     public synchronized List<Packet> getPackets(long start, long end) {
@@ -202,8 +193,6 @@ public class ReliableBuffer {
 
     /**
      * Remove a packet from the buffer
-     * 
-     * @param p
      */
     public synchronized void removePacket(Packet p) {
         removePacket(p.getId());
@@ -211,9 +200,9 @@ public class ReliableBuffer {
 
     /**
      * Remove a packet from the buffer
-     * 
+     *
      * @param id -
-     *            the id of the Packet
+     *           the id of the Packet
      */
     public synchronized void removePacket(String id) {
         ReliablePacket reliablePacket = this.idMap.remove(id);
@@ -222,8 +211,6 @@ public class ReliableBuffer {
 
     /**
      * Remove a Packet from the buffer
-     * 
-     * @param sequenceNumber
      */
     public synchronized void removePacket(long sequenceNumber) {
         ReliablePacket reliablePacket = this.sequenceMap.remove(new Long(sequenceNumber));
@@ -232,9 +219,6 @@ public class ReliableBuffer {
 
     /**
      * Remove Packets from the buffer
-     * 
-     * @param start
-     * @param end
      */
     public synchronized void removePackets(long start, long end) {
         ReliablePacket packet = this.root;
@@ -281,7 +265,7 @@ public class ReliableBuffer {
                 this.tail = (ReliablePacket) reliablePacket.getPrevious();
             }
             reliablePacket.unlink();
-            this.bufferSize -= reliablePacket.getPacket().getPacketData().serializedSizeFramed();
+            this.bufferSize--;
         }
     }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliableFactory.java Fri Jul 19 18:44:21 2013
@@ -17,6 +17,7 @@
 package org.apache.activeblaze.impl.reliable;
 
 import java.util.Map;
+
 import org.apache.activeblaze.impl.processor.ChainedProcessor;
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.util.ObjectFinder;
@@ -24,34 +25,31 @@ import org.apache.activeblaze.util.Prope
 
 /**
  * Find a reliable implementation
- *
  */
 public class ReliableFactory {
-    
+
     private static final ObjectFinder OBJECT_FINDER = new ObjectFinder("META-INF/services/org/apache/activeblaze/reliable/");
 
     /**
-     * @param location
      * @return the configured transport from its URI
-     * @throws Exception
      */
     public static DefaultChainedProcessor get(String location) throws Exception {
-        DefaultChainedProcessor result  = findReliable(location);
+        DefaultChainedProcessor result = findReliable(location);
         configure(result, location);
         return result;
     }
-    
+
     static void configure(ChainedProcessor transport, String location) throws Exception {
         Map<String, String> options = PropertyUtil.parseParameters(location);
         PropertyUtil.setProperties(transport, options);
     }
-    
+
     private static DefaultChainedProcessor findReliable(String location) throws Exception {
-    String scheme = PropertyUtil.stripBefore(location, '?');
-    if (scheme == null) {
-        throw new IllegalArgumentException("Reliability scheme not specified: [" + location + "]");
-    }
-    DefaultChainedProcessor result = (DefaultChainedProcessor) OBJECT_FINDER.newInstance(scheme);
-    return result;
+        String scheme = PropertyUtil.stripBefore(location, '?');
+        if (scheme == null) {
+            throw new IllegalArgumentException("Reliability scheme not specified: [" + location + "]");
+        }
+        DefaultChainedProcessor result = (DefaultChainedProcessor) OBJECT_FINDER.newInstance(scheme);
+        return result;
     }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliablePacket.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliablePacket.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliablePacket.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/ReliablePacket.java Fri Jul 19 18:44:21 2013
@@ -16,12 +16,11 @@
  */
 package org.apache.activeblaze.impl.reliable;
 
-import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.util.LinkedNode;
+import org.apache.activeblaze.wire.Packet;
 
 /**
  * Wrapper for a Packet
- * 
  */
 class ReliablePacket extends LinkedNode {
     private final Packet packet;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/package.html Fri Jul 19 18:44:21 2013
@@ -1,18 +1,18 @@
 !--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-   
-    http://www.apache.org/licenses/LICENSE-2.0
-   
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
 -->
 <html>
 <head>

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/SimpleReliableProcessor.java Fri Jul 19 18:44:21 2013
@@ -17,13 +17,13 @@
 package org.apache.activeblaze.impl.reliable.simple;
 
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
-import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.wire.Packet;
+
 /**
  * Very basic (none) reliability
- * 
  */
 public class SimpleReliableProcessor extends DefaultChainedProcessor {
-    int maxWindowSize = 64 * 1024;
+    int maxWindowSize = 1024;
     int windowSize = 0;
     int pauseTime = 0;
 
@@ -34,12 +34,10 @@ public class SimpleReliableProcessor ext
     }
 
     /**
-     * @param p
-     * @throws Exception
-     * @see org.apache.activeblaze.impl.processor.DefaultChainedProcessor#downStream(org.apache.activeblaze.impl.processor.Packet)
+     * @see org.apache.activeblaze.impl.processor.DefaultChainedProcessor#downStream(org.apache.activeblaze.wire.Packet)
      */
     public void downStream(Packet p) throws Exception {
-        this.windowSize += p.getPacketData().serializedSizeFramed();
+        this.windowSize++;
         if (this.windowSize >= this.maxWindowSize) {
             Thread.sleep(this.pauseTime);
             this.windowSize = 0;
@@ -55,8 +53,7 @@ public class SimpleReliableProcessor ext
     }
 
     /**
-     * @param maxWindowSize
-     *            the maxWindowSize to set
+     * @param maxWindowSize the maxWindowSize to set
      */
     public void setMaxWindowSize(int maxWindowSize) {
         this.maxWindowSize = maxWindowSize;
@@ -70,8 +67,7 @@ public class SimpleReliableProcessor ext
     }
 
     /**
-     * @param pauseTime
-     *            the pauseTime to set
+     * @param pauseTime the pauseTime to set
      */
     public void setPauseTime(int pauseTime) {
         this.pauseTime = pauseTime;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/simple/package.html Fri Jul 19 18:44:21 2013
@@ -1,18 +1,18 @@
 !--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-   
-    http://www.apache.org/licenses/LICENSE-2.0
-   
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
 -->
 <html>
 <head>

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java Fri Jul 19 18:44:21 2013
@@ -22,24 +22,21 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.reliable.ReliableBuffer;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.PacketData;
-import org.apache.activeblaze.wire.AckData.AckDataBean;
-import org.apache.activeblaze.wire.NackData.NackDataBean;
-import org.apache.activeblaze.wire.PacketData.PacketDataBean;
+import org.apache.activeblaze.wire.Ack;
+import org.apache.activeblaze.wire.Nack;
+import org.apache.activeblaze.wire.Packet;
+import org.apache.activeblaze.wire.PacketType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
  * Consumer part of SWP
- * 
  */
 public class ConsumerProcessor {
     static final Log LOG = LogFactory.getLog(ConsumerProcessor.class);
     private static final long NOT_SET = -1l;
-    private int maxWindowSize = 16 * 1024;
+    private int maxWindowSize = 64;
     private int rtt = 1000;
     private long lastAckTime;
     private long firstSequence = NOT_SET;
@@ -58,9 +55,8 @@ public class ConsumerProcessor {
     }
 
     void processInBound(Packet packet) throws Exception {
-        PacketData packetData = packet.getPacketData();
-        MessageType type = packetData.getMessageType();
-        if (type == MessageType.CONTROL_DATA) {
+        int type = packet.getPacketType();
+        if (type == PacketType.CONTROL.getNumber()) {
             if (this.replayBuffer.isEmpty()) {
                 // send back a control message
             }
@@ -78,7 +74,7 @@ public class ConsumerProcessor {
                         this.firstSequence = sequence;
                     }
                     this.lastSequence = sequence;
-                    this.bufferSize += packet.getPacketData().serializedSizeFramed();
+                    this.bufferSize++;
                 } finally {
                     this.lock.unlock();
                 }
@@ -109,22 +105,17 @@ public class ConsumerProcessor {
                     }
                 } else if (!packet.isReplayed() && !this.replayBuffer.isEmpty()) {
                     // request the sequence
-                    NackDataBean nack = new NackDataBean();
+                    Nack nack = new Nack();
                     this.lock.lock();
                     try {
                         nack.setStartSequence(this.lastSequence + 1);
                         nack.setEndSequence(packet.getMessageSequence() - 1);
-                        nack.setSessionId(packet.getPacketData().getSessionId());
-                        nack.setId(this.ackSequence.incrementAndGet());
-                        PacketDataBean pd = new PacketDataBean();
-                        pd.setResponseRequired(false);
-                        pd.setPayload(nack.freeze().toUnframedBuffer());
-                        pd.setMessageType(MessageType.NACK_DATA);
-                        Packet nackPacket = new Packet(pd.freeze());
-                        nackPacket.setTo(this.peerAddress);
-                        this.swp.sendDownStream(nackPacket);
+                        nack.setMessageSequence(this.ackSequence.incrementAndGet());
+                        nack.setResponseRequired(false);
+                        nack.setTo(this.peerAddress);
+                        this.swp.sendDownStream(nack);
                         LOG.debug(this + " Sending Nack: " + nack.getStartSequence() + " , " + nack.getEndSequence());
-                        this.lastAck = nackPacket;
+                        this.lastAck = nack;
                     } finally {
                         this.lock.unlock();
                     }
@@ -135,10 +126,7 @@ public class ConsumerProcessor {
     }
 
     /**
-     * 
-     * @param timeStamp
      * @return if still valid
-     * @throws Exception
      */
     boolean control(long timeStamp) throws Exception {
         boolean result = false;
@@ -153,18 +141,15 @@ public class ConsumerProcessor {
         this.lock.lock();
         try {
             this.bufferSize = 0;
-            AckDataBean ack = new AckDataBean();
+            Ack ack = new Ack();
             ack.setStartSequence(this.firstSequence);
             ack.setEndSequence(this.lastSequence);
-            ack.setId(this.ackSequence.incrementAndGet());
-            PacketDataBean pd = new PacketDataBean();
-            pd.setResponseRequired(false);
-            pd.setPayload(ack.freeze().toUnframedBuffer());
-            pd.setMessageType(MessageType.ACK_DATA);
-            ackPacket = new Packet(pd.freeze());
-            ackPacket.setTo(this.peerAddress);
+            ack.setMessageSequence(this.ackSequence.incrementAndGet());
+            ack.setResponseRequired(false);
+            ack.setTo(this.peerAddress);
             this.lastAckTime = System.currentTimeMillis();
             this.firstSequence = this.lastSequence;
+            ackPacket = ack;
             LOG.debug(this + " Sent Ack " + ack);
         } finally {
             this.lock.unlock();
@@ -173,6 +158,10 @@ public class ConsumerProcessor {
         this.lastAck = ackPacket;
     }
 
+    /**
+     * @return String
+     * @see java.lang.Object#toString()
+     */
     public String toString() {
         return "ConsumerProcessor(" + this.peerAddress + ")";
     }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java Fri Jul 19 18:44:21 2013
@@ -25,53 +25,42 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.activeblaze.BlazeNoRouteException;
-import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.reliable.ReliableBuffer;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.PacketData;
-import org.apache.activeblaze.wire.AckData.AckDataBuffer;
-import org.apache.activeblaze.wire.ControlData.ControlDataBean;
-import org.apache.activeblaze.wire.NackData.NackDataBuffer;
-import org.apache.activeblaze.wire.PacketData.PacketDataBean;
+import org.apache.activeblaze.wire.Ack;
+import org.apache.activeblaze.wire.Control;
+import org.apache.activeblaze.wire.Nack;
+import org.apache.activeblaze.wire.Packet;
+import org.apache.activeblaze.wire.PacketType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 /**
  * state on a request
- * 
  */
 public class ProducerProcessor {
     static final Log LOG = LogFactory.getLog(ProducerProcessor.class);
     private static final long NOT_SET = -1l;
-    private int maxWindowSize = 32 * 1024;
+    private int maxWindowSize = 1024;
     private int rtt = 5000;
     private ReliableBuffer replayBuffer = new ReliableBuffer();
     private final Lock lock = new ReentrantLock();
     private final Condition full = this.lock.newCondition();
     private final SwpProcessor swp;
     private final SocketAddress peerAddress;
-    private final int sessionId;
     private long lastAckId = NOT_SET;
     private long lastAckTime = NOT_SET;
     private AtomicLong sendSequence = new AtomicLong(1);
 
-    ProducerProcessor(SwpProcessor swp, SocketAddress peerAddress, int sessionId) {
+    ProducerProcessor(SwpProcessor swp, SocketAddress peerAddress) {
         this.swp = swp;
         this.peerAddress = peerAddress;
-        this.sessionId = sessionId;
     }
 
     /**
      * blocks until it can send the packet
-     * 
-     * @param packet
-     * @throws BlazeNoRouteException
-     * 
      */
     void processOutbound(final Packet packet) throws BlazeNoRouteException {
-        PacketDataBean bean = packet.getPacketData().copy();
-        bean.setSessionId(this.sessionId);
-        bean.setMessageSequence(this.sendSequence.incrementAndGet());
-        packet.setPacketData(bean.freeze());
+        packet.setMessageSequence(this.sendSequence.incrementAndGet());
         this.lock.lock();
         try {
             this.replayBuffer.addPacket(packet);
@@ -82,11 +71,11 @@ public class ProducerProcessor {
             if (windowSize >= this.maxWindowSize) {
                 if (!this.full.await(this.rtt, TimeUnit.MILLISECONDS)) {
                     this.replayBuffer.clear();
-                    throw new BlazeNoRouteException("No route to "+packet.getTo());
+                    throw new BlazeNoRouteException("No route to " + packet.getTo());
                 }
             }
         } catch (InterruptedException e) {
-            //ignore - we are shutting down
+            // ignore - we are shutting down
         } finally {
             this.lock.unlock();
         }
@@ -94,23 +83,23 @@ public class ProducerProcessor {
 
     Packet processInbound(Packet packet) throws Exception {
         Packet result = null;
-        PacketData data = packet.getPacketData();
-        if (data != null) {
-            MessageType type = data.getMessageType();
-            if (type == MessageType.ACK_DATA) {
-                AckDataBuffer ackData = AckDataBuffer.parseUnframed(data.getPayload());
-                long start = ackData.getStartSequence();
-                long end = ackData.getEndSequence();
+
+        if (packet != null) {
+            int type = packet.getPacketType();
+            if (type == PacketType.ACK.getNumber()) {
+                Ack ack = (Ack) packet;
+                long start = ack.getStartSequence();
+                long end = ack.getEndSequence();
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(this + " Got Ack = " + ackData.getId() + ": " + ackData.getStartSequence() + ","
-                            + ackData.getEndSequence() + " [" + this.replayBuffer.size() + "]");
+                    LOG.debug(this + " Got Ack = " + ack.getId() + ": " + ack.getStartSequence() + ","
+                            + ack.getEndSequence() + " [" + this.replayBuffer.size() + "]");
                 }
                 this.replayBuffer.removePackets(start, end);
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(this + " Processed Ack = " + ackData.getId() + ": " + ackData.getStartSequence() + ","
-                            + ackData.getEndSequence() + " [" + this.replayBuffer.size() + "]");
+                    LOG.debug(this + " Processed Ack = " + ack.getId() + ": " + ack.getStartSequence() + ","
+                            + ack.getEndSequence() + " [" + this.replayBuffer.size() + "]");
                 }
-                this.lastAckId = ackData.getId();
+                this.lastAckId = ack.getMessageSequence();
                 this.lastAckTime = System.currentTimeMillis();
                 if (this.replayBuffer.getBufferSize() <= this.maxWindowSize) {
                     this.lock.lock();
@@ -120,14 +109,14 @@ public class ProducerProcessor {
                         this.lock.unlock();
                     }
                 }
-            } else if (type == MessageType.NACK_DATA) {
+            } else if (type == PacketType.NACK.getNumber()) {
                 this.lastAckTime = System.currentTimeMillis();
-                NackDataBuffer nackData = NackDataBuffer.parseUnframed(data.getPayload());
-                this.lastAckId = nackData.getId();
-                LOG.debug(this + " Got Nack = " + nackData);
+                Nack nack = (Nack) packet;
+                this.lastAckId = nack.getMessageSequence();
+                LOG.debug(this + " Got Nack = " + nack);
                 // lookup any missed messages
-                long start = nackData.getStartSequence();
-                long end = nackData.getEndSequence();
+                long start = nack.getStartSequence();
+                long end = nack.getEndSequence();
                 List<Packet> list = this.replayBuffer.getPackets(start, end);
                 LOG.debug(this + " Replaying " + list);
                 for (Packet p : list) {
@@ -141,31 +130,24 @@ public class ProducerProcessor {
     }
 
     /**
-     * 
-     * @param timeStamp
      * @return true if still valid
-     * @throws Exception
      */
     boolean control(long timeStamp) throws Exception {
         boolean result = false;
         if ((this.lastAckTime + (this.rtt / 2)) < timeStamp) {
             // send a control message
-            Packet ackPacket = null;
+            Control control = new Control();
             this.lock.lock();
             try {
-                ControlDataBean control = new ControlDataBean();
-                control.setLastId(this.lastAckId);
-                PacketDataBean pd = new PacketDataBean();
-                pd.setResponseRequired(false);
-                pd.setPayload(control.freeze().toUnframedBuffer());
-                pd.setMessageType(MessageType.CONTROL_DATA);
-                ackPacket = new Packet(pd.freeze());
-                ackPacket.setTo(this.peerAddress);
-                LOG.debug(this + " Sent Control message " + control);
+
+                control.setLastMessageSequence(this.lastAckId);
+                control.setTo(this.peerAddress);
+
+                LOG.debug(this + " Sending Control message " + control);
             } finally {
                 this.lock.unlock();
             }
-            this.swp.sendDownStream(ackPacket);
+            this.swp.sendDownStream(control);
         } else if (this.lastAckTime + (this.rtt * 2) < timeStamp) {
             // no longer valid
             LOG.debug(this + " Not valid: Last AckTime " + this.lastAckTime + " , " + this.rtt + " , " + timeStamp);
@@ -174,6 +156,10 @@ public class ProducerProcessor {
         return result;
     }
 
+    /**
+     * @return String
+     * @see java.lang.Object#toString()
+     */
     public String toString() {
         return "ProducerProcessor(" + this.peerAddress + ")";
     }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java Fri Jul 19 18:44:21 2013
@@ -23,21 +23,20 @@ import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
-import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.util.LRUCache;
 import org.apache.activeblaze.util.SendRequest;
-import org.apache.activemq.protobuf.Buffer;
+import org.apache.activeblaze.wire.Packet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
  * This is a sliding window protocol for unicast reliability
- * 
  */
 public class SwpProcessor extends DefaultChainedProcessor {
     static final Log LOG = LogFactory.getLog(SwpProcessor.class);
-    private Map<Buffer, SendRequest> messageRequests;
+    private Map<Packet, SendRequest> messageRequests;
     private int maxConcurrentRequests = 1000;
     private int maxWindowSize = 16 * 1024;
     private int windowSize = 0;
@@ -71,7 +70,7 @@ public class SwpProcessor extends Defaul
 
     public void doInit() throws Exception {
         super.doInit();
-        this.messageRequests = new LRUCache<Buffer, SendRequest>(this.maxConcurrentRequests);
+        this.messageRequests = new LRUCache<Packet, SendRequest>(this.maxConcurrentRequests);
     }
 
     public void doStart() throws Exception {
@@ -111,14 +110,15 @@ public class SwpProcessor extends Defaul
             // Make sure we shutdown the timer before shutting down the down stream 
             // processors to avoid the timer getting errors.
             final CountDownLatch done = new CountDownLatch(1);
-            this.statusTimer.schedule(new TimerTask(){
+            this.statusTimer.schedule(new TimerTask() {
                 @Override
                 public void run() {
                     statusTimer.cancel();
                     done.countDown();
-                }}, 0);
+                }
+            }, 0);
             done.await();
-            this.statusTimer=null;
+            this.statusTimer = null;
         }
         super.doStop();
     }
@@ -134,15 +134,14 @@ public class SwpProcessor extends Defaul
     /**
      * @return the messageRequests
      */
-    public Map<Buffer, SendRequest> getMessageRequests() {
+    public Map<Packet, SendRequest> getMessageRequests() {
         return this.messageRequests;
     }
 
     /**
-     * @param messageRequests
-     *            the messageRequests to set
+     * @param messageRequests the messageRequests to set
      */
-    public void setMessageRequests(Map<Buffer, SendRequest> messageRequests) {
+    public void setMessageRequests(Map<Packet, SendRequest> messageRequests) {
         this.messageRequests = messageRequests;
     }
 
@@ -154,8 +153,7 @@ public class SwpProcessor extends Defaul
     }
 
     /**
-     * @param maxConcurrentRequests
-     *            the maxConcurrentRequests to set
+     * @param maxConcurrentRequests the maxConcurrentRequests to set
      */
     public void setMaxConcurrentRequests(int maxConcurrentRequests) {
         this.maxConcurrentRequests = maxConcurrentRequests;
@@ -169,8 +167,7 @@ public class SwpProcessor extends Defaul
     }
 
     /**
-     * @param maxWindowSize
-     *            the maxWindowSize to set
+     * @param maxWindowSize the maxWindowSize to set
      */
     public void setMaxWindowSize(int maxWindowSize) {
         this.maxWindowSize = maxWindowSize;
@@ -184,8 +181,7 @@ public class SwpProcessor extends Defaul
     }
 
     /**
-     * @param windowSize
-     *            the windowSize to set
+     * @param windowSize the windowSize to set
      */
     public void setWindowSize(int windowSize) {
         this.windowSize = windowSize;
@@ -196,7 +192,7 @@ public class SwpProcessor extends Defaul
         synchronized (this.producers) {
             result = this.producers.get(peer);
             if (result == null) {
-                result = new ProducerProcessor(this, peer, this.session.incrementAndGet());
+                result = new ProducerProcessor(this, peer);
                 this.producers.put(peer, result);
             }
         }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/package.html Fri Jul 19 18:44:21 2013
@@ -1,18 +1,18 @@
 !--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-   
-    http://www.apache.org/licenses/LICENSE-2.0
-   
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
 -->
 <html>
 <head>

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java Fri Jul 19 18:44:21 2013
@@ -16,46 +16,40 @@
  */
 package org.apache.activeblaze.impl.transport;
 
+import java.net.SocketAddress;
 import java.net.URI;
 import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.activeblaze.impl.processor.Packet;
-import org.apache.activeblaze.impl.processor.PacketAudit;
-import org.apache.activemq.protobuf.Buffer;
+
+import org.apache.activeblaze.wire.Buffer;
+import org.apache.activeblaze.wire.BufferInputStream;
+import org.apache.activeblaze.wire.BufferOutputStream;
+import org.apache.activeblaze.wire.Packet;
+import org.apache.activeblaze.wire.PacketAudit;
+import org.apache.activeblaze.wire.PacketType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
  * Base Class for transports
- * 
  */
 public abstract class BaseTransport extends ThreadChainedProcessor {
     private static final Log LOG = LogFactory.getLog(BaseTransport.class);
-
+    protected static final short MAGIC = 0xFAB;
     static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
-
     private URI localURI;
-
     private Buffer bufferOfLocalURI;
-
     private int bufferSize = DEFAULT_BUFFER_SIZE;
-
     private int soTimeout = 2000;
-
     private int timeToLive = 1;
-
     private boolean loopBack = false;
-
     protected final PacketAudit audit = new PacketAudit();
-
     private boolean broadcast = true;
-
     private boolean enableAudit = false;
-
     private int maxDispatchQueueSize = 10000;
-
     private LinkedBlockingQueue<Packet> dispatchQueue;
-
     private Thread dispatchQueueThread;
+    private BufferOutputStream bufferOut = new BufferOutputStream(1024);
+    private byte[] intBuffer = new byte[4];
 
     public void doInit() throws Exception {
         super.doInit();
@@ -63,8 +57,7 @@ public abstract class BaseTransport exte
         if (this.localURI != null) {
             this.bufferOfLocalURI = new Buffer(this.localURI.toString());
         }
-        this.dispatchQueue = new LinkedBlockingQueue<Packet>(
-                getMaxDispatchQueueSize());
+        this.dispatchQueue = new LinkedBlockingQueue<Packet>(getMaxDispatchQueueSize());
     }
 
     public void doShutDown() throws Exception {
@@ -82,8 +75,7 @@ public abstract class BaseTransport exte
                 }
             }
         };
-        this.dispatchQueueThread = new Thread(runable, getLocalURI()
-                + "-DispatchQueue");
+        this.dispatchQueueThread = new Thread(runable, getLocalURI() + "-DispatchQueue");
         this.dispatchQueueThread.setDaemon(true);
         this.dispatchQueueThread.start();
     }
@@ -108,8 +100,7 @@ public abstract class BaseTransport exte
     }
 
     /**
-     * @param localURI
-     *            the localURI to set
+     * @param localURI the localURI to set
      */
     public void setLocalURI(URI localURI) {
         this.localURI = localURI;
@@ -126,8 +117,7 @@ public abstract class BaseTransport exte
     }
 
     /**
-     * @param bufferSize
-     *            the bufferSize to set
+     * @param bufferSize the bufferSize to set
      */
     public void setBufferSize(int bufferSize) {
         this.bufferSize = bufferSize;
@@ -141,8 +131,7 @@ public abstract class BaseTransport exte
     }
 
     /**
-     * @param soTimeout
-     *            the soTimeout to set
+     * @param soTimeout the soTimeout to set
      */
     public void setSoTimeout(int soTimeout) {
         this.soTimeout = soTimeout;
@@ -156,8 +145,7 @@ public abstract class BaseTransport exte
     }
 
     /**
-     * @param timeToLive
-     *            the timeToLive to set
+     * @param timeToLive the timeToLive to set
      */
     public void setTimeToLive(int timeToLive) {
         this.timeToLive = timeToLive;
@@ -171,8 +159,7 @@ public abstract class BaseTransport exte
     }
 
     /**
-     * @param loopBack
-     *            the loopBack to set
+     * @param loopBack the loopBack to set
      */
     public void setLoopBack(boolean loopBack) {
         this.loopBack = loopBack;
@@ -193,8 +180,7 @@ public abstract class BaseTransport exte
     }
 
     /**
-     * @param broadcast
-     *            the broadcast to set
+     * @param broadcast the broadcast to set
      */
     public void setBroadcast(boolean broadcast) {
         this.broadcast = broadcast;
@@ -208,8 +194,7 @@ public abstract class BaseTransport exte
     }
 
     /**
-     * @param enableAudit
-     *            the enableAudit to set
+     * @param enableAudit the enableAudit to set
      */
     public void setEnableAudit(boolean enableAudit) {
         this.enableAudit = enableAudit;
@@ -222,9 +207,10 @@ public abstract class BaseTransport exte
         return this.bufferOfLocalURI;
     }
 
-    public String toString() {
-        return this.localURI != null ? this.localURI.toString()
-                : " Uninitialized Transport";
+    public final String toString() {
+        String str = "" + System.identityHashCode(this) + ": ";
+        str += this.localURI != null ? this.localURI.toString() : " Uninitialized Transport";
+        return str;
     }
 
     /**
@@ -235,8 +221,7 @@ public abstract class BaseTransport exte
     }
 
     /**
-     * @param maxDispatchQueueSize
-     *            the maxDispatchQueueSize to set
+     * @param maxDispatchQueueSize the maxDispatchQueueSize to set
      */
     public void setMaxDispatchQueueSize(int maxDispatchQueueSize) {
         this.maxDispatchQueueSize = maxDispatchQueueSize;
@@ -244,7 +229,13 @@ public abstract class BaseTransport exte
 
     public void upStream(Packet packet) throws Exception {
         if (!isStopped()) {
-            this.dispatchQueue.put(packet);
+            if (!this.enableAudit || !this.audit.isDuplicate(packet)) {
+                this.dispatchQueue.put(packet);
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(toString() + " Ignoring duplicate packet: " + packet);
+                }
+            }
         }
     }
 
@@ -258,7 +249,7 @@ public abstract class BaseTransport exte
         } catch (InterruptedException e1) {
             // we've stopped
         } catch (Exception e) {
-            String value="";
+            String value = "";
             try {
                 value = packet.toString();
             } catch (Throwable ignore) {
@@ -267,4 +258,55 @@ public abstract class BaseTransport exte
             stopInternal();
         }
     }
+
+    public final synchronized void downStream(Packet packet) throws Exception {
+        if (isInitialized()) {
+            if (isEnableAudit()) {
+                // add to audit
+                this.audit.isDuplicate(packet);
+            }
+            this.bufferOut.reset();
+            this.bufferOut.writeShort(MAGIC);
+            this.bufferOut.skip(4);
+            this.bufferOut.writeByte(packet.getPacketType());
+            packet.write(this.bufferOut);
+            int len = this.bufferOut.length() - 6;
+            this.intBuffer[0] = (byte) (len >>> 24);
+            this.intBuffer[1] = (byte) (len >>> 16);
+            this.intBuffer[2] = (byte) (len >>> 8);
+            this.intBuffer[3] = (byte) (len >>> 0);
+            System.arraycopy(this.intBuffer, 0, this.bufferOut.getBuffer(), this.bufferOut.getOffset() + 2,
+                    this.intBuffer.length);
+            SocketAddress to = packet.getTo();
+            sendData(to, this.bufferOut.getBuffer(), this.bufferOut.getOffset(), this.bufferOut.length());
+        }
+    }
+
+    protected final void processData(SocketAddress from, Buffer buffer) throws Exception {
+        Packet packet = buildPacket(from, buffer);
+        upStream(packet);
+    }
+
+    protected final Packet buildPacket(SocketAddress from, Buffer buffer) throws Exception {
+        Packet result = null;
+        BufferInputStream in = new BufferInputStream(buffer);
+        short magic = in.readShort();
+        if (magic == MAGIC) {
+            int len = in.readInt();
+            int type = in.readByte();
+            Packet packet = PacketType.valueOf(type).createPacket();
+            packet.read(in);
+            packet.setFrom(from);
+            result = packet;
+        } else {
+            LOG.warn("Bad Packet magic " + magic);
+        }
+        return result;
+        /*
+         * result = IOUtils.readPacket(buffer); result.setFrom(from); return
+         * result;
+         */
+    }
+
+    protected abstract void sendData(SocketAddress to, byte[] data, int offset, int length) throws Exception;
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java Fri Jul 19 18:44:21 2013
@@ -16,124 +16,148 @@
  */
 package org.apache.activeblaze.impl.transport;
 
-import java.net.DatagramPacket;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
 import java.net.NetworkInterface;
 import java.net.SocketAddress;
+import java.net.StandardProtocolFamily;
+import java.net.StandardSocketOptions;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.MembershipKey;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activeblaze.BlazeException;
-import org.apache.activeblaze.impl.processor.Packet;
-import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
+import org.apache.activeblaze.wire.Buffer;
 
 /**
  * Multicast transport
- * 
  */
 public class MulticastTransport extends BaseTransport {
-    private MulticastSocket socket;
-    private String networkInterface;
+    DatagramChannel datagramChannel;
+    ByteBuffer receiveBuffer;
+    private String networkInterfaceName;
     private InetSocketAddress socketAddress;
+    private MembershipKey membershipKey;
+    private NetworkInterface networkInterface;
+    private Map<String, MembershipKey> membershipKeyMap = new ConcurrentHashMap<String, MembershipKey>();
+
 
     public void doInit() throws Exception {
         super.doInit();
-        this.socket = new MulticastSocket(getLocalURI().getPort());
-        this.socket.setTimeToLive(getTimeToLive());
-        this.socket.setLoopbackMode(isLoopBack());
-        this.socket.setSoTimeout(getSoTimeout());
-        this.socket.setReceiveBufferSize(getBufferSize());
-        this.socket.setSendBufferSize(getBufferSize());
         this.socketAddress = new InetSocketAddress(InetAddress.getByName(getLocalURI().getHost()), getLocalURI()
                 .getPort());
-        NetworkInterface ni = null;
-        if (getNetworkInterface() != null && getNetworkInterface().length() > 0) {
-            ni = NetworkInterface.getByName(getNetworkInterface());
-            if (ni == null) {
-                throw new BlazeException("Couldn't find an network interface named " + getNetworkInterface());
-            }
-        }
-        if (ni != null) {
-            this.socket.joinGroup(this.socketAddress, ni);
+
+        if (getNetworkInterfaceName() != null && getNetworkInterfaceName().isEmpty()) {
+            networkInterface = NetworkInterface.getByName(getNetworkInterfaceName());
         } else {
-            this.socket.joinGroup(this.socketAddress.getAddress());
+            networkInterface = NetworkInterface.getNetworkInterfaces().nextElement();
+            networkInterfaceName = networkInterface.getName();
+        }
+
+        if (networkInterface == null) {
+            throw new BlazeException("Couldn't find an network interface named " + getNetworkInterfaceName());
+        }
+        try {
+            receiveBuffer = ByteBuffer.allocateDirect(getBufferSize());
+            datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET);
+            if (datagramChannel.isOpen()) {
+                datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, getBufferSize());
+                datagramChannel.setOption(StandardSocketOptions.SO_SNDBUF, getBufferSize());
+
+                datagramChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+                datagramChannel.setOption(StandardSocketOptions.IP_MULTICAST_IF, networkInterface);
+
+                InetAddress group = InetAddress.getByName(getLocalURI().getHost());
+                if (!group.isMulticastAddress()) {
+                    throw new BlazeException(getLocalURI().getHost() + " is not a multicast address");
+                }
+
+                datagramChannel.bind(new InetSocketAddress(getLocalURI().getPort()));
+                membershipKey = datagramChannel.join(group, networkInterface);
+                // so need to reset the local uri
+                URI oldURI = getLocalURI();
+                URI newURI = new URI(oldURI.getScheme(), oldURI.getUserInfo(), oldURI.getHost(), ((InetSocketAddress) datagramChannel.getLocalAddress()).getPort(), oldURI
+                        .getPath(), oldURI.getQuery(), oldURI.getFragment());
+                setLocalURI(newURI);
+            }
+        } catch (Exception e) {
+            throw new BlazeException("Could not open Datagram channel ", e);
         }
     }
 
     public void doShutDown() throws Exception {
         super.doShutDown();
-        if (this.socket != null) {
-            this.socket.close();
+
+        for (MembershipKey key : membershipKeyMap.values()) {
+            key.drop();
+        }
+        membershipKeyMap.clear();
+        if (membershipKey != null) {
+            membershipKey.drop();
+        }
+        if (this.datagramChannel != null) {
+            this.datagramChannel.close();
         }
     }
 
+
     protected void doProcess() throws Exception {
-        if (isInitialized()) {
-            byte[] receiveData = new byte[getMaxPacketSize()];
-            DatagramPacket dp = new DatagramPacket(receiveData, receiveData.length);
-            this.socket.receive(dp);
-            if (dp.getLength() > 0) {
-                PacketDataBuffer data = PacketDataBuffer.parseFramed(dp.getData());
-                SocketAddress address = dp.getSocketAddress();
-                Packet packet = new Packet(address, data);
-                if (!isEnableAudit() || !this.audit.isDuplicate(packet)) {
-                    upStream(packet);
-                }
-            }
+        SocketAddress socketAddress = datagramChannel.receive(receiveBuffer);
+        receiveBuffer.flip();
+        if (receiveBuffer.limit() > 0) {
+            Buffer buffer = new Buffer(receiveBuffer);
+            processData(socketAddress, buffer);
         }
+        receiveBuffer.clear();
     }
 
-    public synchronized void downStream(Packet packet) throws Exception {
+    public void sendData(SocketAddress to, byte[] data, int offset, int length) throws Exception {
         if (isInitialized()) {
-            if (isEnableAudit()) {
-                // add to audit
-                this.audit.isDuplicate(packet);
-            }
-            byte[] data = packet.getPacketData().toFramedByteArray();
-            SocketAddress to = packet.getTo();
-            DatagramPacket dp = new DatagramPacket(data, data.length, to);
-            this.socket.send(dp);
+            ByteBuffer byteBuffer = ByteBuffer.wrap(data, offset, length);
+            this.datagramChannel.send(byteBuffer, to);
         }
     }
 
     /**
-     * @return the networkInterface
+     * @return the networkInterfaceName
      */
-    public String getNetworkInterface() {
-        return this.networkInterface;
+    public String getNetworkInterfaceName() {
+        return this.networkInterfaceName;
     }
 
     /**
-     * @param networkInterface
-     *            the networkInterface to set
+     * @param networkInterfaceName the networkInterfaceName to set
      */
-    public void setNetworkInterface(String networkInterface) {
-        this.networkInterface = networkInterface;
+    public void setNetworkInterfaceName(String networkInterfaceName) {
+        this.networkInterfaceName = networkInterfaceName;
     }
 
     /**
      * join a multicast group
-     * 
-     * @param address
-     * @throws Exception
      */
     public void joinGroup(String address) throws Exception {
         if (isInitialized()) {
             InetAddress group = InetAddress.getByName(address);
-            this.socket.joinGroup(group);
+            if (!group.isMulticastAddress()) {
+                throw new BlazeException(address + " is not a multicast address");
+            }
+            MembershipKey key = datagramChannel.join(group, networkInterface);
+            membershipKeyMap.put(address, key);
         }
     }
 
     /**
      * leave a multicast group
-     * 
-     * @param address
-     * @throws Exception
      */
     public void leaveGroup(String address) throws Exception {
         if (isInitialized()) {
-            InetAddress group = InetAddress.getByName(address);
-            this.socket.leaveGroup(group);
+            MembershipKey key = membershipKeyMap.remove(address);
+            if (key != null) {
+                key.drop();
+            }
         }
     }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java Fri Jul 19 18:44:21 2013
@@ -17,13 +17,13 @@
 package org.apache.activeblaze.impl.transport;
 
 import java.net.SocketTimeoutException;
+
 import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
  * Thread associated with processing
- * 
  */
 public abstract class ThreadChainedProcessor extends DefaultChainedProcessor implements Runnable {
     private static final Log LOG = LogFactory.getLog(ThreadChainedProcessor.class);
@@ -74,8 +74,6 @@ public abstract class ThreadChainedProce
 
     /**
      * Process input for the Processor
-     * 
-     * @throws Exception
      */
     protected abstract void doProcess() throws Exception;
 
@@ -87,8 +85,7 @@ public abstract class ThreadChainedProce
     }
 
     /**
-     * @param priority
-     *            the priority to set
+     * @param priority the priority to set
      */
     public void setPriority(int priority) {
         this.priority = priority;
@@ -102,8 +99,7 @@ public abstract class ThreadChainedProce
     }
 
     /**
-     * @param daemon
-     *            the daemon to set
+     * @param daemon the daemon to set
      */
     public void setDaemon(boolean daemon) {
         this.daemon = daemon;
@@ -117,8 +113,7 @@ public abstract class ThreadChainedProce
     }
 
     /**
-     * @param name
-     *            the name to set
+     * @param name the name to set
      */
     public void setName(String name) {
         this.name = name;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java Fri Jul 19 18:44:21 2013
@@ -18,41 +18,39 @@ package org.apache.activeblaze.impl.tran
 
 import java.net.URI;
 import java.util.Map;
+
 import org.apache.activeblaze.util.ObjectFinder;
 import org.apache.activeblaze.util.PropertyUtil;
 
 
 /**
  * Find a Transport from a URI scheme
- *
  */
 public abstract class TransportFactory {
 
     private static final ObjectFinder OBJECT_FINDER = new ObjectFinder("META-INF/services/org/apache/activeblaze/transport/");
 
     /**
-     * @param location
      * @return the configured transport from its URI
-     * @throws Exception
      */
     public static BaseTransport get(URI location) throws Exception {
-        BaseTransport result  = findTransport(location);
+        BaseTransport result = findTransport(location);
         result.setLocalURI(location);
         configureTransport(result, location);
         return result;
     }
-    
+
     static void configureTransport(BaseTransport transport, URI uri) throws Exception {
         Map<String, String> options = PropertyUtil.parseParameters(uri);
         PropertyUtil.setProperties(transport, options);
     }
-    
+
     private static BaseTransport findTransport(URI location) throws Exception {
-    String scheme = location.getScheme();
-    if (scheme == null) {
-        throw new IllegalArgumentException("Transport scheme not specified: [" + location + "]");
-    }
-    BaseTransport result = (BaseTransport) OBJECT_FINDER.newInstance(scheme);
-    return result;
+        String scheme = location.getScheme();
+        if (scheme == null) {
+            throw new IllegalArgumentException("Transport scheme not specified: [" + location + "]");
+        }
+        BaseTransport result = (BaseTransport) OBJECT_FINDER.newInstance(scheme);
+        return result;
     }
 }
\ No newline at end of file

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Fri Jul 19 18:44:21 2013
@@ -16,161 +16,99 @@
  */
 package org.apache.activeblaze.impl.transport;
 
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.net.StandardProtocolFamily;
+import java.net.StandardSocketOptions;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.nio.channels.DatagramChannel;
 import java.util.Map;
 
 import org.apache.activeblaze.BlazeException;
-import org.apache.activeblaze.BlazeNoRouteException;
-import org.apache.activeblaze.impl.processor.Packet;
-import org.apache.activeblaze.util.IOUtils;
 import org.apache.activeblaze.util.LRUCache;
 import org.apache.activeblaze.util.SendRequest;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.PacketData;
-import org.apache.activeblaze.wire.AckData.AckDataBean;
-import org.apache.activeblaze.wire.PacketData.PacketDataBean;
-import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
-import org.apache.activemq.protobuf.Buffer;
+import org.apache.activeblaze.wire.Ack;
+import org.apache.activeblaze.wire.Buffer;
+import org.apache.activeblaze.wire.Packet;
 
 /**
  * UdpTransport
- * 
  */
 public class UdpTransport extends BaseTransport {
-    private DatagramChannel channel;
+    DatagramChannel datagramChannel;
+    ByteBuffer receiveBuffer;
+    private Map<Packet, SendRequest> messageRequests = new LRUCache<Packet, SendRequest>(5000);
 
-    private ByteBuffer inBuffer;
-
-    private ByteBuffer outBuffer;
-
-    private Map<Buffer, SendRequest<PacketDataBuffer>> messageRequests = new LRUCache<Buffer, SendRequest<PacketDataBuffer>>(
-            1000);
 
     public void doInit() throws Exception {
         super.doInit();
-        this.channel = DatagramChannel.open();
-        DatagramSocket socket = this.channel.socket();
-        SocketAddress address = null;
+        InetSocketAddress address = null;
         if (getLocalURI() != null) {
-            address = new InetSocketAddress(getLocalURI().getHost(),
-                    getLocalURI().getPort());
+            address = new InetSocketAddress(getLocalURI().getHost(), getLocalURI().getPort());
         } else {
             throw new BlazeException("localURI not set");
         }
-        socket.setBroadcast(isBroadcast());
-        socket.setReceiveBufferSize(getBufferSize());
-        socket.setSendBufferSize(getBufferSize());
-        socket.setSoTimeout(getSoTimeout());
-        this.channel.configureBlocking(true);
-        socket.bind(address);
-        // if the port was 0 - the port will be allocated automatically -
-        // so need to reset the local uri
-        URI oldURI = getLocalURI();
-        URI newURI = new URI(oldURI.getScheme(), oldURI.getUserInfo(), oldURI
-                .getHost(), socket.getLocalPort(), oldURI.getPath(), oldURI
-                .getQuery(), oldURI.getFragment());
-        setLocalURI(newURI);
-        this.inBuffer = ByteBuffer.allocateDirect(getMaxPacketSize());
-        this.outBuffer = ByteBuffer.allocateDirect(getMaxPacketSize());
+        try {
+            receiveBuffer = ByteBuffer.allocateDirect(getBufferSize());
+            datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET);
+            if (datagramChannel.isOpen()) {
+                datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, getBufferSize());
+                datagramChannel.setOption(StandardSocketOptions.SO_SNDBUF, getBufferSize());
+                datagramChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+                datagramChannel.bind(address);
+                // so need to reset the local uri
+                URI oldURI = getLocalURI();
+                URI newURI = new URI(oldURI.getScheme(), oldURI.getUserInfo(), oldURI.getHost(), ((InetSocketAddress) datagramChannel.getLocalAddress()).getPort(), oldURI
+                        .getPath(), oldURI.getQuery(), oldURI.getFragment());
+                setLocalURI(newURI);
+            }
+        } catch (Exception e) {
+            throw new BlazeException("Could not open Datagram channel for " + getLocalURI(), e);
+        }
     }
 
     public void doShutDown() throws Exception {
         super.doShutDown();
-        if (this.channel != null) {
-            this.channel.close();
-            this.inBuffer = null;
-            this.outBuffer = null;
-            this.channel = null;
+        if (this.datagramChannel != null) {
+            this.datagramChannel.close();
         }
     }
 
     protected void doProcess() throws Exception {
-        this.inBuffer.clear();
-        SocketAddress address = this.channel.receive(this.inBuffer);
-        ByteBuffer buffer = this.inBuffer;
-        if (isInitialized()) {
-            buffer.flip();
-            while (buffer.remaining() > 0) {
-                InputStream stream = IOUtils.getByteBufferInputStream(buffer);
-                PacketDataBuffer data = PacketDataBuffer.parseFramed(stream);
-                stream.close();
-                if (data.getResponse()) {
-                    synchronized (this.messageRequests) {
-                        SendRequest<PacketDataBuffer> request = this.messageRequests.remove(data.getCorrelationId());
-                        if (request != null) {
-                            request.put(data.getMessageId(), data);
-                        }
-                    }
-                }
-                if (data.getResponseRequired()) {
-                    Packet packet = createAckPacket(data);
-                    packet.setTo(address);
-                    downStream(packet);
-                }
-                Packet packet = new Packet(address, data);
-                if (!isEnableAudit() || !this.audit.isDuplicate(packet)) {
-                    upStream(packet);
+        SocketAddress socketAddress = datagramChannel.receive(receiveBuffer);
+        receiveBuffer.flip();
+        if (receiveBuffer.limit() > 0) {
+            Buffer buffer = new Buffer(receiveBuffer);
+            Packet packet = buildPacket(socketAddress, buffer);
+            if (packet.isResponse()) {
+                SendRequest request = this.messageRequests.remove(packet.getCorrelationId());
+                if (request != null) {
+                    request.put(packet.getCorrelationId(), packet);
                 }
             }
-            buffer.clear();
+            if (packet.isResponseRequired()) {
+                Packet ack = createAckPacket(packet);
+                ack.setTo(socketAddress);
+                downStream(ack);
+            }
+            upStream(packet);
         }
+        receiveBuffer.clear();
     }
 
-    public void downStream(Packet packet) throws Exception {
-        ByteBuffer buffer = this.outBuffer;
-        if (isStarted()) {
-            SendRequest<PacketDataBuffer> request = null;
-            if (packet.isResponseRequired()) {
-                synchronized (this.messageRequests) {
-                    request = new SendRequest<PacketDataBuffer>();
-                    this.messageRequests.put(packet.getPacketData().getMessageId(), request);
-                }
-            }
-            synchronized (buffer) {
-                buffer.clear();
-                OutputStream stream = IOUtils.getByteBufferOutputStream(buffer);
-                if (isEnableAudit()) {
-                    // add to audit
-                    this.audit.isDuplicate(packet);
-                }
-                packet.getPacketData().writeFramed(stream);
-                stream.close();
-                buffer.flip();
-                this.channel.send(buffer, packet.getTo());
-            }
-            if (request != null) {
-                if (request.get(getSoTimeout()) == null) {
-                    throw new BlazeNoRouteException("No response in "
-                            + getSoTimeout() + " ms from " + packet.getTo());
-                }
-            }
-        } else {
-            if (!shutDown()) {
-                throw new BlazeException(this + " Not started - cannot send "
-                        + packet);
-            }
+    public void sendData(SocketAddress to, byte[] data, int offset, int length) throws Exception {
+        if (isInitialized()) {
+            ByteBuffer byteBuffer = ByteBuffer.wrap(data, offset, length);
+            this.datagramChannel.send(byteBuffer, to);
         }
     }
 
-    private Packet createAckPacket(PacketData data) {
-        AckDataBean ackData = new AckDataBean();
-        ackData.setSessionId(data.getSessionId());
-        ackData.setStartSequence(data.getMessageSequence());
-        ackData.setEndSequence(data.getMessageSequence());
-        PacketDataBean pd = new PacketDataBean();
-        pd.setResponseRequired(false);
-        pd.setCorrelationId(data.getMessageId());
-        pd.setResponse(true);
-        pd.setPayload(ackData.freeze().toUnframedBuffer());
-        pd.setMessageType(MessageType.ACK_DATA);
-        Packet packet = new Packet(pd.freeze());
-        return packet;
+    private Packet createAckPacket(Packet packet) {
+        Ack ack = new Ack();
+        ack.setStartSequence(packet.getMessageSequence());
+        ack.setEndSequence(packet.getMessageSequence());
+        ack.setCorrelationId(packet.getId());
+        return ack;
     }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/package.html Fri Jul 19 18:44:21 2013
@@ -1,18 +1,18 @@
 !--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-   
-    http://www.apache.org/licenses/LICENSE-2.0
-   
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
 -->
 <html>
 <head>

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java Fri Jul 19 18:44:21 2013
@@ -18,39 +18,19 @@ package org.apache.activeblaze.jms;
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
-import javax.jms.Connection;
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
+
+import javax.jms.*;
 import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
 import org.apache.activeblaze.BlazeMessageListener;
-import org.apache.activeblaze.BlazeMessageProcessor;
 import org.apache.activeblaze.Subscription;
 import org.apache.activeblaze.group.BlazeGroupChannel;
-import org.apache.activeblaze.jms.message.BlazeJmsMessage;
-import org.apache.activeblaze.jms.message.BlazeJmsMessageTransformation;
 import org.apache.activeblaze.util.IdGenerator;
-import org.apache.activeblaze.wire.DestinationData;
-import org.apache.activeblaze.wire.PacketData;
-import org.apache.activeblaze.wire.BlazeData.BlazeDataBuffer;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.InvalidProtocolBufferException;
+
 /**
  * Implementation of a JMS Connection
- * 
  */
 public class BlazeJmsConnection implements Connection, TopicConnection, QueueConnection,
-        org.apache.activeblaze.ExceptionListener, BlazeMessageProcessor {
+        org.apache.activeblaze.ExceptionListener {
     protected final BlazeGroupChannel channel;
     protected final IdGenerator tempDestinationGenerator = new IdGenerator("");
     private String clientId;
@@ -64,11 +44,9 @@ public class BlazeJmsConnection implemen
         this.channel = channel;
         this.channel.setExceptionListener(this);
         this.clientId = channel.getName();
-        this.channel.setBlazeMessageProcessor(this);
     }
 
     /**
-     * @throws JMSException
      * @see javax.jms.Connection#close()
      */
     public void close() throws JMSException {
@@ -85,44 +63,30 @@ public class BlazeJmsConnection implemen
     }
 
     /**
-     * @param destination
-     * @param messageSelector
-     * @param sessionPool
-     * @param maxMessages
      * @return ConnectionConsumer
-     * @throws JMSException
      * @see javax.jms.Connection#createConnectionConsumer(javax.jms.Destination,
      *      java.lang.String, javax.jms.ServerSessionPool, int)
      */
     public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
-            ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+                                                       ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosed();
         return null;
     }
 
     /**
-     * @param topic
-     * @param subscriptionName
-     * @param messageSelector
-     * @param sessionPool
-     * @param maxMessages
      * @return ConnectionConsumer
-     * @throws JMSException
      * @see javax.jms.Connection#createDurableConnectionConsumer(javax.jms.Topic,
      *      java.lang.String, java.lang.String, javax.jms.ServerSessionPool,
      *      int)
      */
     public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
-            String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+                                                              String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosed();
         return null;
     }
 
     /**
-     * @param transacted
-     * @param acknowledgeMode
      * @return Session
-     * @throws JMSException
      * @see javax.jms.Connection#createSession(boolean, int)
      */
     public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
@@ -158,8 +122,6 @@ public class BlazeJmsConnection implemen
     }
 
     /**
-     * @param clientID
-     * @throws JMSException
      * @see javax.jms.Connection#setClientID(java.lang.String)
      */
     public void setClientID(String clientID) throws JMSException {
@@ -175,7 +137,6 @@ public class BlazeJmsConnection implemen
     }
 
     /**
-     * @param listener
      * @see javax.jms.Connection#setExceptionListener(javax.jms.ExceptionListener)
      */
     public void setExceptionListener(ExceptionListener listener) {
@@ -183,7 +144,6 @@ public class BlazeJmsConnection implemen
     }
 
     /**
-     * @throws JMSException
      * @see javax.jms.Connection#start()
      */
     public void start() throws JMSException {
@@ -196,7 +156,6 @@ public class BlazeJmsConnection implemen
     }
 
     /**
-     * @throws JMSException
      * @see javax.jms.Connection#stop()
      */
     public void stop() throws JMSException {
@@ -209,26 +168,18 @@ public class BlazeJmsConnection implemen
     }
 
     /**
-     * @param topic
-     * @param messageSelector
-     * @param sessionPool
-     * @param maxMessages
      * @return ConnectionConsumer
-     * @throws JMSException
      * @see javax.jms.TopicConnection#createConnectionConsumer(javax.jms.Topic,
      *      java.lang.String, javax.jms.ServerSessionPool, int)
      */
     public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
-            ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+                                                       ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosed();
         return null;
     }
 
     /**
-     * @param transacted
-     * @param acknowledgeMode
      * @return TopicSession
-     * @throws JMSException
      * @see javax.jms.TopicConnection#createTopicSession(boolean, int)
      */
     public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
@@ -240,26 +191,18 @@ public class BlazeJmsConnection implemen
     }
 
     /**
-     * @param queue
-     * @param messageSelector
-     * @param sessionPool
-     * @param maxMessages
      * @return ConnectionConsumer
-     * @throws JMSException
      * @see javax.jms.QueueConnection#createConnectionConsumer(javax.jms.Queue,
      *      java.lang.String, javax.jms.ServerSessionPool, int)
      */
     public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
-            ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+                                                       ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosed();
         return null;
     }
 
     /**
-     * @param transacted
-     * @param acknowledgeMode
      * @return QueueSession
-     * @throws JMSException
      * @see javax.jms.QueueConnection#createQueueSession(boolean, int)
      */
     public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
@@ -271,7 +214,6 @@ public class BlazeJmsConnection implemen
     }
 
     /**
-     * @param ex
      * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
      */
     public void onException(Exception ex) {
@@ -279,7 +221,6 @@ public class BlazeJmsConnection implemen
     }
 
     /**
-     * @param ex
      * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
      */
     public void onException(JMSException ex) {
@@ -347,7 +288,7 @@ public class BlazeJmsConnection implemen
 
     /**
      * Get the consumerMaxDispatchQueueDepth
-     * 
+     *
      * @return the consumerMaxDispatchQueueDepth
      */
     public int getConsumerMaxDispatchQueueDepth() {
@@ -356,43 +297,10 @@ public class BlazeJmsConnection implemen
 
     /**
      * Set the consumerMaxDispatchQueueDepth
-     * 
-     * @param consumerMaxDispatchQueueDepth
-     *            the consumerMaxDispatchQueueDepth to set
+     *
+     * @param consumerMaxDispatchQueueDepth the consumerMaxDispatchQueueDepth to set
      */
     public void setConsumerMaxDispatchQueueDepth(int consumerMaxDispatchQueueDepth) {
         this.consumerMaxDispatchQueueDepth = consumerMaxDispatchQueueDepth;
     }
-
-    /**
-     * @param data
-     * @return a BlazeMessage
-     * @throws Exception
-     * 
-     */
-    public BlazeJmsMessage processBlazeMessage(PacketData data) throws Exception {
-        BlazeJmsMessage result = null;
-        if (data != null) {
-            DestinationData destination = data.getDestinationData();
-            Buffer payload = data.getPayload();
-            BlazeDataBuffer blazeData = BlazeDataBuffer.parseUnframed(payload);
-            String fromId = null;
-            if (data.hasProducerId()) {
-                fromId = data.getProducerId().toStringUtf8();
-            }
-            result = BlazeJmsMessageTransformation.createMessage(data.getPayloadType());
-            result.setDestination(destination);
-            result.setFromId(fromId);
-            if (data.hasMessageId()) {
-                result.setMessageId(data.getMessageId().toStringUtf8());
-            }
-            if (data.hasCorrelationId()) {
-                result.setCorrelationId(data.getCorrelationId().toStringUtf8());
-            }
-            result.setTimeStamp(blazeData.getTimestamp());
-            result.setType(data.getPayloadType());
-            result.setContent(blazeData);
-        }
-        return result;
-    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java Fri Jul 19 18:44:21 2013
@@ -16,15 +16,10 @@
  */
 package org.apache.activeblaze.jms;
 
-import org.apache.activeblaze.BlazeRuntimeException;
-import org.apache.activeblaze.group.BlazeGroupChannelFactory;
-import org.apache.activeblaze.group.BlazeGroupConfiguration;
-import org.apache.activeblaze.jndi.JNDIStorable;
-import org.apache.activeblaze.util.IdGenerator;
-import org.apache.activeblaze.util.PropertyUtil;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
@@ -32,17 +27,22 @@ import javax.jms.QueueConnection;
 import javax.jms.QueueConnectionFactory;
 import javax.jms.TopicConnection;
 import javax.jms.TopicConnectionFactory;
+import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activeblaze.group.BlazeGroupChannelFactory;
+import org.apache.activeblaze.group.BlazeGroupConfiguration;
+import org.apache.activeblaze.jndi.JNDIStorable;
+import org.apache.activeblaze.util.IdGenerator;
+import org.apache.activeblaze.util.PropertyUtil;
 
 /**
  * Jms ConnectionFactory implementation
- * 
  */
 public class BlazeJmsConnectionFactory extends JNDIStorable implements ConnectionFactory, QueueConnectionFactory,
-TopicConnectionFactory {
+        TopicConnectionFactory {
     private static final IdGenerator NAME_GENERATOR = new IdGenerator();
     private final BlazeGroupChannelFactory groupChannelFactory;
     private final Map<String, String> props = new HashMap<String, String>();
-    private int consumerMaxDispatchQueueDepth=10000;
+    private int consumerMaxDispatchQueueDepth = 10000;
 
     /**
      * Constructor
@@ -53,8 +53,6 @@ TopicConnectionFactory {
 
     /**
      * Constructor
-     * 
-     * @param config
      */
     public BlazeJmsConnectionFactory(BlazeGroupConfiguration config) {
         this.groupChannelFactory = new BlazeGroupChannelFactory(config);
@@ -69,11 +67,10 @@ TopicConnectionFactory {
 
     /**
      * Set properties
-     * @param props
      */
     public void setProperties(Properties props) {
-        Map<String,String> map = new HashMap<String, String>();
-        for (Map.Entry<Object,Object> entry: props.entrySet()) {
+        Map<String, String> map = new HashMap<String, String>();
+        for (Map.Entry<Object, Object> entry : props.entrySet()) {
             map.put(entry.getKey().toString(), entry.getValue().toString());
         }
         setProperties(map);
@@ -85,7 +82,6 @@ TopicConnectionFactory {
     }
 
     /**
-     * @param props
      * @see org.apache.activeblaze.jndi.JNDIStorable#buildFromProperties(Map<String, String> map)
      */
     @Override
@@ -95,8 +91,6 @@ TopicConnectionFactory {
     }
 
     /**
-     * 
-     * @param map
      * @see org.apache.activeblaze.jndi.JNDIStorable#populateProperties(Map<String, String> map)
      */
     @Override
@@ -114,7 +108,6 @@ TopicConnectionFactory {
 
     /**
      * @return a TopicConnection
-     * @throws JMSException
      * @see javax.jms.TopicConnectionFactory#createTopicConnection()
      */
     public TopicConnection createTopicConnection() throws JMSException {
@@ -129,10 +122,7 @@ TopicConnectionFactory {
     }
 
     /**
-     * @param userName
-     * @param password
      * @return a TopicConnection
-     * @throws JMSException
      * @see javax.jms.TopicConnectionFactory#createTopicConnection(java.lang.String, java.lang.String)
      */
     public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
@@ -148,7 +138,6 @@ TopicConnectionFactory {
 
     /**
      * @return a Connection
-     * @throws JMSException
      * @see javax.jms.ConnectionFactory#createConnection()
      */
     public Connection createConnection() throws JMSException {
@@ -163,10 +152,7 @@ TopicConnectionFactory {
     }
 
     /**
-     * @param userName
-     * @param password
      * @return Connection
-     * @throws JMSException
      * @see javax.jms.ConnectionFactory#createConnection(java.lang.String, java.lang.String)
      */
     public Connection createConnection(String userName, String password) throws JMSException {
@@ -182,7 +168,6 @@ TopicConnectionFactory {
 
     /**
      * @return a QueueConnection
-     * @throws JMSException
      * @see javax.jms.QueueConnectionFactory#createQueueConnection()
      */
     public QueueConnection createQueueConnection() throws JMSException {
@@ -197,10 +182,7 @@ TopicConnectionFactory {
     }
 
     /**
-     * @param userName
-     * @param password
      * @return a QueueConnection
-     * @throws JMSException
      * @see javax.jms.QueueConnectionFactory#createQueueConnection(java.lang.String, java.lang.String)
      */
     public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
@@ -216,6 +198,7 @@ TopicConnectionFactory {
 
     /**
      * Get the consumerMaxDispatchQueueDepth
+     *
      * @return the consumerMaxDispatchQueueDepth
      */
     public int getConsumerMaxDispatchQueueDepth() {
@@ -224,6 +207,7 @@ TopicConnectionFactory {
 
     /**
      * Set the consumerMaxDispatchQueueDepth
+     *
      * @param consumerMaxDispatchQueueDepth the consumerMaxDispatchQueueDepth to set
      */
     public void setConsumerMaxDispatchQueueDepth(int consumerMaxDispatchQueueDepth) {



Mime
View raw message