activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r719706 [5/6] - in /activemq/activemq-blaze: ./ branches/ tags/ trunk/ trunk/src/ trunk/src/main/ trunk/src/main/java/ trunk/src/main/java/org/ trunk/src/main/java/org/apache/ trunk/src/main/java/org/apache/activeblaze/ trunk/src/main/java/...
Date Fri, 21 Nov 2008 20:44:43 GMT
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.transport;
+
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketAddress;
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.wire.PacketData;
+
+/**
+ * Multicast transport
+ * 
+ */
+public class MulticastTransport extends BaseTransport {
+    private MulticastSocket socket;
+    private String networkInterface;
+    private byte[] receiveData;
+    private InetSocketAddress socketAddress;
+
+    public boolean init() throws Exception {
+        boolean result = super.init();
+        if (result) {
+            this.receiveData = new byte[getMaxPacketSize()];
+            this.socket = new MulticastSocket(getLocalURI().getPort());
+            this.socket.setTimeToLive(getTimeToLive());
+            this.socket.setLoopbackMode(isLoopBack());
+            this.socket.setSoTimeout(getSoTimeout());
+            this.socket.setReceiveBufferSize(getBufferSize());
+            this.socket.setSendBufferSize(getBufferSize());
+            this.socketAddress = new InetSocketAddress(InetAddress.getByName(getLocalURI().getHost()), getLocalURI()
+                    .getPort());
+            NetworkInterface ni = null;
+            if (getNetworkInterface() != null && getNetworkInterface().length() > 0) {
+                ni = NetworkInterface.getByName(getNetworkInterface());
+                if (ni == null) {
+                    throw new BlazeException("Couldn't find an network interface named " + getNetworkInterface());
+                }
+            }
+            if (ni != null) {
+                this.socket.joinGroup(this.socketAddress, ni);
+            } else {
+                this.socket.joinGroup(this.socketAddress.getAddress());
+            }
+        }
+        return result;
+    }
+
+    public boolean shutDown() throws Exception {
+        boolean result = super.shutDown();
+        if (result) {
+            if (this.socket != null) {
+                this.socket.close();
+            }
+        }
+        return result;
+    }
+
+    protected void doProcess() throws Exception {
+        if (isInitialized()) {
+            DatagramPacket dp = new DatagramPacket(this.receiveData, this.receiveData.length);
+            this.socket.receive(dp);
+            if (dp.getLength() > 0) {
+                PacketData data = PacketData.parseFramed(dp.getData());
+                SocketAddress address = dp.getSocketAddress();
+                Packet packet = new Packet(address, data);
+                if (!isEnableAudit() || !this.audit.isDuplicate(packet)) {
+                    upStream(packet);
+                }
+            }
+        }
+    }
+
+    public void downStream(Packet packet) throws Exception {
+        if (isInitialized()) {
+            if (isEnableAudit()) {
+                // add to audit
+                this.audit.isDuplicate(packet);
+            }
+            byte[] data = packet.getPacketData().toFramedByteArray();
+            InetSocketAddress to = packet.getTo();
+            DatagramPacket dp = new DatagramPacket(data, data.length, to);
+            this.socket.send(dp);
+        } else {
+            throw new BlazeException("Not initialized");
+        }
+    }
+
+    /**
+     * @return the networkInterface
+     */
+    public String getNetworkInterface() {
+        return this.networkInterface;
+    }
+
+    /**
+     * @param networkInterface
+     *            the networkInterface to set
+     */
+    public void setNetworkInterface(String networkInterface) {
+        this.networkInterface = networkInterface;
+    }
+
+    /**
+     * join a multicast group
+     * 
+     * @param address
+     * @throws Exception
+     */
+    public void joinGroup(String address) throws Exception {
+        if (isInitialized()) {
+            InetAddress group = InetAddress.getByName(address);
+            this.socket.joinGroup(group);
+        }
+    }
+
+    /**
+     * leave a multicast group
+     * 
+     * @param address
+     * @throws Exception
+     */
+    public void leaveGroup(String address) throws Exception {
+        if (isInitialized()) {
+            InetAddress group = InetAddress.getByName(address);
+            this.socket.leaveGroup(group);
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.transport;
+
+import java.net.SocketTimeoutException;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Thread associated with processing
+ *
+ */
+public abstract class ThreadChainedProcessor extends ChainedProcessor implements Runnable {
+    private static final Log LOG = LogFactory.getLog(ThreadChainedProcessor.class);
+    private int priority=Thread.NORM_PRIORITY;
+    private boolean daemon;
+    private String name;
+    private Thread thread;
+  
+    
+    
+    public boolean start() throws Exception {
+        boolean result = super.start();
+        if(result) {
+            String name = getName() != null ? getName() : toString();
+            this.thread = new Thread(this,name);
+            this.thread.setDaemon(isDaemon());
+            this.thread.setPriority(getPriority());
+            this.thread.start();
+        }
+        return result;
+    }
+    
+    public boolean stop() throws Exception {
+        boolean result = super.stop();
+        if(result) {
+           if (this.thread != null) {
+               try {
+                   this.thread.interrupt();
+                this.thread.join();
+            } catch (InterruptedException e) {
+            }
+           }
+        }
+        return result;
+    }
+    
+    public void run() {
+        while (isStarted()) {
+            try {
+                doProcess();
+            } catch(SocketTimeoutException ste) {
+                //normal - ignore
+            }
+            catch (Exception e) {
+                try {
+                    boolean stopped = isStopped();
+                    super.stop();
+                    if (!stopped) {
+                        fireException("Failed to process", e);
+                    }
+                } catch (Exception be) {
+                    LOG.warn("Problem in stopping in run() ", be);
+                }
+            }
+        }
+    }
+    
+    /**
+     * Process input for the Processor
+     * @throws Exception
+     */
+    protected abstract void doProcess() throws Exception;
+
+    /**
+     * @return the priority
+     */
+    public int getPriority() {
+        return this.priority;
+    }
+    /**
+     * @param priority the priority to set
+     */
+    public void setPriority(int priority) {
+        this.priority = priority;
+    }
+    /**
+     * @return the daemon
+     */
+    public boolean isDaemon() {
+        return this.daemon;
+    }
+    /**
+     * @param daemon the daemon to set
+     */
+    public void setDaemon(boolean daemon) {
+        this.daemon = daemon;
+    }
+    /**
+     * @return the name
+     */
+    public String getName() {
+        return this.name;
+    }
+    /**
+     * @param name the name to set
+     */
+    public void setName(String name) {
+        this.name = name;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/ThreadChainedProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.transport;
+
+import java.net.URI;
+import java.util.Map;
+import org.apache.activeblaze.util.ObjectFinder;
+import org.apache.activeblaze.util.PropertyUtil;
+
+
+/**
+ * Find a Transport from a URI scheme
+ *
+ */
+public abstract class TransportFactory {
+
+    private static final ObjectFinder OBJECT_FINDER = new ObjectFinder("META-INF/services/org/apache/activeblaze/transport/");
+
+    /**
+     * @param location
+     * @return the configured transport from its URI
+     * @throws Exception
+     */
+    public static BaseTransport get(URI location) throws Exception {
+        BaseTransport result  = findTransport(location);
+        result.setLocalURI(location);
+        configureTransport(result, location);
+        return result;
+    }
+    
+    static void configureTransport(BaseTransport transport, URI uri) throws Exception {
+        Map<String, String> options = PropertyUtil.parseParameters(uri);
+        PropertyUtil.setProperties(transport, options);
+    }
+    
+    private static BaseTransport findTransport(URI location) throws Exception {
+    String scheme = location.getScheme();
+    if (scheme == null) {
+        throw new IllegalArgumentException("Transport scheme not specified: [" + location + "]");
+    }
+    BaseTransport result = (BaseTransport) OBJECT_FINDER.newInstance(scheme);
+    return result;
+    }
+}
\ No newline at end of file

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/TransportFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.transport;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.util.IOUtils;
+import org.apache.activeblaze.wire.PacketData;
+
+/**
+ * UdpTransport
+ * 
+ */
+public class UdpTransport extends BaseTransport {
+    private DatagramChannel channel;
+    private ByteBuffer inBuffer;
+    private ByteBuffer outBuffer;
+
+    public boolean init() throws Exception {
+        boolean result = super.init();
+        if (result) {
+            this.channel = DatagramChannel.open();
+            DatagramSocket socket = this.channel.socket();
+            SocketAddress address = null;
+            if (getLocalURI() != null) {
+                address = new InetSocketAddress(getLocalURI().getHost(), getLocalURI().getPort());
+            } else {
+                throw new BlazeException("localURI not set");
+            }
+            socket.setBroadcast(isBroadcast());
+            socket.setReceiveBufferSize(getBufferSize());
+            socket.setSendBufferSize(getBufferSize());
+            socket.setSoTimeout(getSoTimeout());
+            this.channel.configureBlocking(true);
+            socket.bind(address);
+            // if the port was 0 - the port will be allocated automatically -
+            // so need to reset the local uri
+            URI oldURI = getLocalURI();
+            URI newURI = new URI(oldURI.getScheme(), oldURI.getUserInfo(), oldURI.getHost(), socket.getLocalPort(),
+                    oldURI.getPath(), oldURI.getQuery(), oldURI.getFragment());
+            setLocalURI(newURI);
+            this.inBuffer = ByteBuffer.allocateDirect(getMaxPacketSize());
+            this.outBuffer = ByteBuffer.allocateDirect(getMaxPacketSize());
+        }
+        return result;
+    }
+
+    public boolean shutDown() throws Exception {
+        boolean result = super.shutDown();
+        if (result) {
+            if (this.channel != null) {
+                this.channel.close();
+                this.inBuffer = null;
+                this.outBuffer = null;
+                this.channel = null;
+            }
+        }
+        return result;
+    }
+
+    protected void doProcess() throws Exception {
+        this.inBuffer.clear();
+        SocketAddress address = this.channel.receive(this.inBuffer);
+        ByteBuffer buffer = this.inBuffer;
+        if (isInitialized()) {
+            buffer.flip();
+            if (buffer.remaining() > 0) {
+                InputStream stream = IOUtils.getByteBufferInputStream(buffer);
+                PacketData data = PacketData.parseFramed(stream);
+                stream.close();
+                Packet packet = new Packet(address, data);
+                if (!isEnableAudit() || !this.audit.isDuplicate(packet)) {
+                    upStream(packet);
+                }
+            }
+            buffer.clear();
+        }
+    }
+
+    public void downStream(Packet packet) throws Exception {
+        ByteBuffer buffer = this.outBuffer;
+        if (isStarted()) {
+            buffer.clear();
+            OutputStream stream = IOUtils.getByteBufferOutputStream(buffer);
+            if (isEnableAudit()) {
+                // add to audit
+                this.audit.isDuplicate(packet);
+            }
+            packet.getPacketData().writeFramed(stream);
+            stream.close();
+            buffer.flip();
+            this.channel.send(buffer, packet.getTo());
+        } else {
+            throw new BlazeException("Not started");
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/package.html?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/package.html (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/package.html Fri Nov 21 12:44:40 2008
@@ -0,0 +1,25 @@
+!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Transports for communication
+
+</body>
+</html>
\ No newline at end of file

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/package.html?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/package.html (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/package.html Fri Nov 21 12:44:40 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+ultra fast reliable messaging
+
+</body>
+</html>
\ No newline at end of file

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BitArray.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BitArray.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BitArray.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BitArray.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Simple BitArray to enable setting multiple boolean values efficently Used
+ * instead of BitSet because BitSet does not allow for efficent serialization.
+ * Will store up to 64 boolean values
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class BitArray {
+    static final int LONG_SIZE = 64;
+    static final int INT_SIZE = 32;
+    static final int SHORT_SIZE = 16;
+    static final int BYTE_SIZE = 8;
+    private static final long[] BIT_VALUES = {0x0000000000000001L, 0x0000000000000002L, 0x0000000000000004L,
+                                              0x0000000000000008L, 0x0000000000000010L, 0x0000000000000020L,
+                                              0x0000000000000040L, 0x0000000000000080L, 0x0000000000000100L,
+                                              0x0000000000000200L, 0x0000000000000400L, 0x0000000000000800L,
+                                              0x0000000000001000L, 0x0000000000002000L, 0x0000000000004000L,
+                                              0x0000000000008000L, 0x0000000000010000L, 0x0000000000020000L,
+                                              0x0000000000040000L, 0x0000000000080000L, 0x0000000000100000L,
+                                              0x0000000000200000L, 0x0000000000400000L, 0x0000000000800000L,
+                                              0x0000000001000000L, 0x0000000002000000L, 0x0000000004000000L,
+                                              0x0000000008000000L, 0x0000000010000000L, 0x0000000020000000L,
+                                              0x0000000040000000L, 0x0000000080000000L, 0x0000000100000000L,
+                                              0x0000000200000000L, 0x0000000400000000L, 0x0000000800000000L,
+                                              0x0000001000000000L, 0x0000002000000000L, 0x0000004000000000L,
+                                              0x0000008000000000L, 0x0000010000000000L, 0x0000020000000000L,
+                                              0x0000040000000000L, 0x0000080000000000L, 0x0000100000000000L,
+                                              0x0000200000000000L, 0x0000400000000000L, 0x0000800000000000L,
+                                              0x0001000000000000L, 0x0002000000000000L, 0x0004000000000000L,
+                                              0x0008000000000000L, 0x0010000000000000L, 0x0020000000000000L,
+                                              0x0040000000000000L, 0x0080000000000000L, 0x0100000000000000L,
+                                              0x0200000000000000L, 0x0400000000000000L, 0x0800000000000000L,
+                                              0x1000000000000000L, 0x2000000000000000L, 0x4000000000000000L,
+                                              0x8000000000000000L};
+    private long bits;
+    private int length;
+
+    /**
+     * @return the length of bits set
+     */
+    public int length() {
+        return this.length;
+    }
+
+    /**
+     * @return the long containing the bits
+     */
+    public long getBits() {
+        return this.bits;
+    }
+
+    /**
+     * set the boolean value at the index
+     * 
+     * @param index
+     * @param flag
+     * @return the old value held at this index
+     */
+    public boolean set(int index, boolean flag) {
+        this.length = Math.max(this.length, index + 1);
+        boolean oldValue = (this.bits & BIT_VALUES[index]) != 0;
+        if (flag) {
+            this.bits |= BIT_VALUES[index];
+        } else if (oldValue) {
+            this.bits &= ~(BIT_VALUES[index]);
+        }
+        return oldValue;
+    }
+
+    /**
+     * @param index
+     * @return the boolean value at this index
+     */
+    public boolean get(int index) {
+        return (this.bits & BIT_VALUES[index]) != 0;
+    }
+
+    /**
+     * reset all the bit values to false
+     */
+    public void reset() {
+        this.bits = 0;
+    }
+
+    /**
+     * reset all the bits to the value supplied
+     * 
+     * @param bits
+     */
+    public void reset(long bits) {
+        this.bits = bits;
+    }
+
+    /**
+     * write the bits to an output stream
+     * 
+     * @param dataOut
+     * @throws IOException
+     */
+    public void writeToStream(DataOutput dataOut) throws IOException {
+        dataOut.writeByte(this.length);
+        if (this.length <= BYTE_SIZE) {
+            dataOut.writeByte((int)this.bits);
+        } else if (this.length <= SHORT_SIZE) {
+            dataOut.writeShort((short)this.bits);
+        } else if (this.length <= INT_SIZE) {
+            dataOut.writeInt((int)this.bits);
+        } else {
+            dataOut.writeLong(this.bits);
+        }
+    }
+
+    /**
+     * read the bits from an input stream
+     * 
+     * @param dataIn
+     * @throws IOException
+     */
+    public void readFromStream(DataInput dataIn) throws IOException {
+        this.length = dataIn.readByte();
+        if (this.length <= BYTE_SIZE) {
+            this.bits = dataIn.readByte();
+        } else if (this.length <= SHORT_SIZE) {
+            this.bits = dataIn.readShort();
+        } else if (this.length <= INT_SIZE) {
+            this.bits = dataIn.readInt();
+        } else {
+            this.bits = dataIn.readLong();
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BitArray.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BitArray.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BitArrayBin.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BitArrayBin.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BitArrayBin.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BitArrayBin.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.util;
+
+import java.util.LinkedList;
+
+/**
+ * Holder for many bitArrays - used for message audit
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class BitArrayBin {
+
+    private LinkedList<BitArray> list;
+    private int maxNumberOfArrays;
+    private int firstIndex = -1;
+    private long lastInOrderBit=-1;
+
+    /**
+     * Create a BitArrayBin to a certain window size (number of messages to
+     * keep)
+     * 
+     * @param windowSize
+     */
+    public BitArrayBin(int windowSize) {
+        this.maxNumberOfArrays = ((windowSize + 1) / BitArray.LONG_SIZE) + 1;
+        this.maxNumberOfArrays = Math.max(this.maxNumberOfArrays, 1);
+        this.list = new LinkedList<BitArray>();
+        for (int i = 0; i < this.maxNumberOfArrays; i++) {
+            this.list.add(null);
+        }
+    }
+
+    /**
+     * Set a bit
+     * 
+     * @param index
+     * @param value
+     * @return true if set
+     */
+    public boolean setBit(long index, boolean value) {
+        boolean answer = false;
+        BitArray ba = getBitArray(index);
+        if (ba != null) {
+            int offset = getOffset(index);
+            if (offset >= 0) {
+                answer = ba.set(offset, value);
+            }
+        }
+        return answer;
+    }
+    
+    /**
+     * Test if in order
+     * @param index
+     * @return true if next message is in order
+     */
+    public boolean isInOrder(long index) {
+        boolean result = false;
+        if (this.lastInOrderBit == -1) {
+            result = true;
+        } else {
+            result = this.lastInOrderBit + 1 == index;
+        }
+        this.lastInOrderBit = index;
+        return result;
+
+    }
+
+    /**
+     * Get the boolean value at the index
+     * 
+     * @param index
+     * @return true/false
+     */
+    public boolean getBit(long index) {
+        boolean answer = index >= this.firstIndex;
+        BitArray ba = getBitArray(index);
+        if (ba != null) {
+            int offset = getOffset(index);
+            if (offset >= 0) {
+                answer = ba.get(offset);
+                return answer;
+            }
+        } else {
+            // gone passed range for previous bins so assume set
+            answer = true;
+        }
+        return answer;
+    }
+
+    /**
+     * Get the BitArray for the index
+     * 
+     * @param index
+     * @return BitArray
+     */
+    private BitArray getBitArray(long index) {
+        int bin = getBin(index);
+        BitArray answer = null;
+        if (bin >= 0) {
+            if (bin >= this.maxNumberOfArrays) {
+                int overShoot = bin - maxNumberOfArrays + 1;
+                while (overShoot > 0) {
+                    this.list.removeFirst();
+                    this.firstIndex += BitArray.LONG_SIZE;
+                    this.list.add(new BitArray());
+                    overShoot--;
+                }
+                
+                bin = this.maxNumberOfArrays - 1;
+            }
+            answer = this.list.get(bin);
+            if (answer == null) {
+                answer = new BitArray();
+                this.list.set(bin, answer);
+            }
+        }
+        return answer;
+    }
+
+    /**
+     * Get the index of the bin from the total index
+     * 
+     * @param index
+     * @return the index of the bin
+     */
+    private int getBin(long index) {
+        int answer = 0;
+        if (this.firstIndex < 0) {
+            this.firstIndex = (int) (index - (index % BitArray.LONG_SIZE));
+        } else if (this.firstIndex >= 0) {
+            answer = (int)((index - this.firstIndex) / BitArray.LONG_SIZE);
+        }
+        return answer;
+    }
+
+    /**
+     * Get the offset into a bin from the total index
+     * 
+     * @param index
+     * @return the relative offset into a bin
+     */
+    private int getOffset(long index) {
+        int answer = 0;
+        if (firstIndex >= 0) {
+            answer = (int)((index - firstIndex) - (BitArray.LONG_SIZE * getBin(index)));
+        }
+        return answer;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BitArrayBin.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BitArrayBin.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities for ByteBuffers
+ * 
+ */
+public class IOUtils {
+    /**
+     * Create an InputStream to read a ByteBuffer
+     * 
+     * @param buf
+     * @return
+     */
+    public static InputStream getByteBufferInputStream(final ByteBuffer buf) {
+        return new InputStream() {
+            public int read() throws IOException {
+                if (!buf.hasRemaining()) {
+                    return -1;
+                }
+                return buf.get();
+            }
+
+            public int read(byte[] bytes, int off, int len) throws IOException {
+                int toWrite = Math.min(len, buf.remaining());
+                buf.get(bytes, off, toWrite);
+                return len;
+            }
+        };
+    }
+
+    /**
+     * Create an OutputStream for a ByteBuffer
+     * 
+     * @param buf
+     * @return
+     */
+    public static OutputStream getByteBufferOutputStream(final ByteBuffer buf) {
+        return new OutputStream() {
+            public void write(int b) throws IOException {
+                buf.put((byte) b);
+            }
+
+            public void write(byte[] bytes, int off, int len) throws IOException {
+                buf.put(bytes, off, len);
+            }
+        };
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IdGenerator.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IdGenerator.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IdGenerator.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IdGenerator.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.util;
+
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Generator for Globally unique Strings.
+ */
+
+public class IdGenerator {
+
+    private static final Logger LOG = Logger.getLogger(IdGenerator.class.getName());
+    private static final String UNIQUE_STUB;
+    private static int instanceCount;
+    private static String hostName;
+    private String seed;
+    private long sequence;
+
+    static {
+        String stub = "";
+        boolean canAccessSystemProps = true;
+        try {
+            SecurityManager sm = System.getSecurityManager();
+            if (sm != null) {
+                sm.checkPropertiesAccess();
+            }
+        } catch (SecurityException se) {
+            canAccessSystemProps = false;
+        }
+
+        if (canAccessSystemProps) {
+            try {
+                hostName = InetAddress.getLocalHost().getHostName();
+                ServerSocket ss = new ServerSocket(0);
+                stub = "-" + ss.getLocalPort() + "-" + System.currentTimeMillis() + "-";
+                Thread.sleep(100);
+                ss.close();
+            } catch (Exception ioe) {
+                LOG.log(Level.WARNING, "could not generate unique stub", ioe);
+            }
+        } else {
+            hostName = "localhost";
+            stub = "-1-" + System.currentTimeMillis() + "-";
+        }
+        UNIQUE_STUB = stub;
+    }
+
+    /**
+     * Construct an IdGenerator
+     */
+    public IdGenerator(String prefix) {
+        synchronized (UNIQUE_STUB) {
+            this.seed = prefix + UNIQUE_STUB + (instanceCount++) + ":";
+        }
+    }
+
+    public IdGenerator() {
+        this("ID:" + hostName);
+    }
+
+    /**
+     * As we have to find the hostname as a side-affect of generating a unique
+     * stub, we allow it's easy retrevial here
+     * 
+     * @return the local host name
+     */
+
+    public static String getHostName() {
+        return hostName;
+    }
+
+
+    /**
+     * Generate a unqiue id
+     * 
+     * @return a unique id
+     */
+
+    public synchronized String generateId() {
+        return this.seed + (this.sequence++);
+    }
+
+    /**
+     * Generate a unique ID - that is friendly for a URL or file system
+     * 
+     * @return a unique id
+     */
+    public String generateSanitizedId() {
+        String result = generateId();
+        result = result.replace(':', '-');
+        result = result.replace('_', '-');
+        result = result.replace('.', '-');
+        return result;
+    }
+
+    /**
+     * From a generated id - return the seed (i.e. minus the count)
+     * 
+     * @param id the generated identifer
+     * @return the seed
+     */
+    public static String getSeedFromId(String id) {
+        String result = id;
+        if (id != null) {
+            int index = id.lastIndexOf(':');
+            if (index > 0 && (index + 1) < id.length()) {
+                result = id.substring(0, index + 1);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * From a generated id - return the generator count
+     * 
+     * @param id
+     * @return the count
+     */
+    public static long getSequenceFromId(String id) {
+        long result = -1;
+        if (id != null) {
+            int index = id.lastIndexOf(':');
+
+            if (index > 0 && (index + 1) < id.length()) {
+                String numStr = id.substring(index + 1, id.length());
+                result = Long.parseLong(numStr);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Does a proper compare on the ids
+     * 
+     * @param id1
+     * @param id2
+     * @return 0 if equal else a positive if id1 is > id2 ...
+     */
+
+    public static int compare(String id1, String id2) {
+        int result = -1;
+        String seed1 = IdGenerator.getSeedFromId(id1);
+        String seed2 = IdGenerator.getSeedFromId(id2);
+        if (seed1 != null && seed2 != null) {
+            result = seed1.compareTo(seed2);
+            if (result == 0) {
+                long count1 = IdGenerator.getSequenceFromId(id1);
+                long count2 = IdGenerator.getSequenceFromId(id2);
+                result = (int)(count1 - count2);
+            }
+        }
+        return result;
+
+    }
+
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IdGenerator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ObjectFinder.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ObjectFinder.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ObjectFinder.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ObjectFinder.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.util;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Find and build an object from a location
+ *
+ */
+public class ObjectFinder {
+
+    private final String path;
+    private final Map<String, Class<?>> classMap = new ConcurrentHashMap<String, Class<?>>();
+
+    /**
+     * Constructor
+     * @param path
+     */
+    public ObjectFinder(String path) {
+        this.path = path;
+    }
+
+    /**
+     * Creates a new instance of the given key
+     * 
+     * @param key is the key to add to the path to find a text file containing
+     *                the factory name
+     * @return a newly created instance
+     * @throws Exception 
+     */
+    public Object newInstance(String key) throws Exception {
+        return newInstance(key, null);
+    }
+
+    /**
+     * @param key
+     * @param propertyPrefix
+     * @return
+     * @throws Exception
+     */
+    public Object newInstance(String key, String propertyPrefix) throws Exception {
+        if (propertyPrefix == null) {
+            propertyPrefix = "";
+        }
+
+        Class<?> clazz = this.classMap.get(propertyPrefix + key);
+        if (clazz == null) {
+            clazz = newInstance(doFindFactoryProperies(key), propertyPrefix);
+            this.classMap.put(propertyPrefix + key, clazz);
+        }
+        return clazz.newInstance();
+    }
+
+    private Class<?> newInstance(Properties properties, String propertyPrefix) throws ClassNotFoundException, IOException {
+
+        String className = properties.getProperty(propertyPrefix + "class");
+        if (className == null) {
+            throw new IOException("Expected property is missing: " + propertyPrefix + "class");
+        }
+        Class<?> clazz = null;
+        ClassLoader loader = Thread.currentThread().getContextClassLoader();
+        if (loader != null) {
+            try {
+                clazz = loader.loadClass(className);
+            } catch (ClassNotFoundException e) {
+                // ignore
+            }
+        }
+        if (clazz == null) {
+            clazz = ObjectFinder.class.getClassLoader().loadClass(className);
+        }
+
+        return clazz;
+    }
+
+    private Properties doFindFactoryProperies(String key) throws IOException {
+        String uri = this.path + key;
+
+        // lets try the thread context class loader first
+        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+        if (classLoader == null) {
+            classLoader = getClass().getClassLoader();
+        }
+        InputStream in = classLoader.getResourceAsStream(uri);
+        if (in == null) {
+            in = ObjectFinder.class.getClassLoader().getResourceAsStream(uri);
+            if (in == null) {
+                throw new IOException("Could not find class from resource: " + uri);
+            }
+        }
+
+        // lets load the file
+        BufferedInputStream reader = null;
+        try {
+            reader = new BufferedInputStream(in);
+            Properties properties = new Properties();
+            properties.load(reader);
+            return properties;
+        } finally {
+            try {
+                reader.close();
+            } catch (Exception e) {
+            }
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/ObjectFinder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/PropertyUtil.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/PropertyUtil.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/PropertyUtil.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/PropertyUtil.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.util;
+
+import java.beans.BeanInfo;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.beans.PropertyEditor;
+import java.beans.PropertyEditorManager;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utilities for properties
+ * 
+ */
+public class PropertyUtil {
+    /**
+     * Get properties from a URI
+     * 
+     * @param uri
+     * @return <Code>Map</Code> of properties
+     * @throws Exception
+     * 
+     */
+    public static Map<String, String> parseParameters(URI uri) throws Exception {
+        return uri.getQuery() == null ? Collections.EMPTY_MAP : parseQuery(stripPrefix(uri.getQuery(), "?"));
+    }
+
+    /**
+     * Parse properties from a named resource -eg. a URI or a simple name e.g. foo?name="fred"&size=2
+     * 
+     * @param uri
+     * @return <Code>Map</Code> of properties
+     * @throws Exception
+     * 
+     */
+    public static Map<String, String> parseParameters(String uri) throws Exception {
+        return uri == null ? Collections.EMPTY_MAP : parseQuery(stripUpto(uri, '?'));
+    }
+
+    /**
+     * Get properties from a uri
+     * 
+     * @param uri
+     * @return <Code>Map</Code> of properties
+     * @throws Exception
+     */
+    public static Map<String, String> parseQuery(String uri) throws Exception {
+        if (uri != null) {
+            Map<String, String> rc = new HashMap<String, String>();
+            if (uri != null) {
+                String[] parameters = uri.split("&");
+                for (int i = 0; i < parameters.length; i++) {
+                    int p = parameters[i].indexOf("=");
+                    if (p >= 0) {
+                        String name = URLDecoder.decode(parameters[i].substring(0, p), "UTF-8");
+                        String value = URLDecoder.decode(parameters[i].substring(p + 1), "UTF-8");
+                        rc.put(name, value);
+                    } else {
+                        rc.put(parameters[i], null);
+                    }
+                }
+            }
+            return rc;
+        }
+        return Collections.EMPTY_MAP;
+    }
+
+    /**
+     * Add bean properties to a URI
+     * 
+     * @param uri
+     * @param bean
+     * @return <Code>Map</Code> of properties
+     * @throws Exception
+     */
+    public static String addPropertiesToURIFromBean(String uri, Object bean) throws Exception {
+        Map<String, String> props = PropertyUtil.getProperties(bean);
+        return PropertyUtil.addPropertiesToURI(uri, props);
+    }
+
+    /**
+     * Add properties to a URI
+     * 
+     * @param uri
+     * @param props
+     * @return uri with properties on
+     * @throws Exception
+     * 
+     */
+    public static String addPropertiesToURI(String uri, Map<String, String> props) throws Exception {
+        String result = uri;
+        if (uri != null && props != null) {
+            StringBuilder base = new StringBuilder(stripBefore(uri, '?'));
+            Map<String, String> map = parseParameters(uri);
+            if (!map.isEmpty()) {
+                map.putAll(props);
+            }
+            if (!map.isEmpty()) {
+                base.append('?');
+                boolean first = true;
+                for (Map.Entry<String, String> entry : map.entrySet()) {
+                    if (!first) {
+                        base.append('&');
+                    }
+                    first = false;
+                    base.append(entry.getKey()).append("=").append(entry.getValue());
+                }
+                result = base.toString();
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Set properties on an object
+     * 
+     * @param target
+     * @param props
+     */
+    public static void setProperties(Object target, Map<String, String> props) {
+        if (target == null) {
+            throw new IllegalArgumentException("target was null.");
+        }
+        if (props == null) {
+            throw new IllegalArgumentException("props was null.");
+        }
+        for (Map.Entry<String, String> entry : props.entrySet()) {
+            if (setProperty(target, (String) entry.getKey(), entry.getValue())) {
+            }
+        }
+    }
+
+    /**
+     * Get properties from an object
+     * 
+     * @param object
+     * @return <Code>Map</Code> of properties
+     * @throws Exception
+     */
+    public static Map<String, String> getProperties(Object object) throws Exception {
+        Map<String, String> props = new HashMap<String, String>();
+        BeanInfo beanInfo = Introspector.getBeanInfo(object.getClass());
+        Object[] NULL_ARG = {};
+        PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
+        if (propertyDescriptors != null) {
+            for (int i = 0; i < propertyDescriptors.length; i++) {
+                PropertyDescriptor pd = propertyDescriptors[i];
+                if (pd.getReadMethod() != null && !pd.getName().equals("class")) {
+                    props.put(pd.getName(), ("" + pd.getReadMethod().invoke(object, NULL_ARG)));
+                }
+            }
+        }
+        return props;
+    }
+
+    /**
+     * Set a property
+     * 
+     * @param target
+     * @param name
+     * @param value
+     * @return true if set
+     */
+    public static boolean setProperty(Object target, String name, Object value) {
+        try {
+            Class<? extends Object> clazz = target.getClass();
+            Method setter = findSetterMethod(clazz, name);
+            if (setter == null) {
+                return false;
+            }
+            // If the type is null or it matches the needed type, just use the
+            // value directly
+            if (value == null || value.getClass() == setter.getParameterTypes()[0]) {
+                setter.invoke(target, new Object[] { value });
+            } else {
+                // We need to convert it
+                setter.invoke(target, new Object[] { convert(value, setter.getParameterTypes()[0]) });
+            }
+            return true;
+        } catch (Throwable ignore) {
+            return false;
+        }
+    }
+
+    /**
+     * Return a String past a prefix
+     * 
+     * @param value
+     * @param prefix
+     * @return stripped
+     */
+    public static String stripPrefix(String value, String prefix) {
+        if (value.startsWith(prefix)) {
+            return value.substring(prefix.length());
+        }
+        return value;
+    }
+
+    /**
+     * Return a String from to a character
+     * 
+     * @param value
+     * @param c
+     * @return stripped
+     */
+    public static String stripUpto(String value, char c) {
+        String result = null;
+        int index = value.indexOf(c);
+        if (index > 0) {
+            result = value.substring(index + 1);
+        }
+        return result;
+    }
+
+    /**
+     * Return a String up to and including character
+     * 
+     * @param value
+     * @param c
+     * @return stripped
+     */
+    public static String stripBefore(String value, char c) {
+        String result = value;
+        int index = value.indexOf(c);
+        if (index > 0) {
+            result = value.substring(0, index);
+        }
+        return result;
+    }
+
+    private static Method findSetterMethod(Class<? extends Object> clazz, String name) {
+        // Build the method name.
+        name = "set" + name.substring(0, 1).toUpperCase() + name.substring(1);
+        Method[] methods = clazz.getMethods();
+        for (int i = 0; i < methods.length; i++) {
+            Method method = methods[i];
+            Class<? extends Object> params[] = method.getParameterTypes();
+            if (method.getName().equals(name) && params.length == 1) {
+                return method;
+            }
+        }
+        return null;
+    }
+
+    private static Object convert(Object value, Class<?> type) throws Exception {
+        PropertyEditor editor = PropertyEditorManager.findEditor(type);
+        if (editor != null) {
+            editor.setAsText(value.toString());
+            return editor.getValue();
+        }
+        if (type == URI.class) {
+            return new URI(value.toString());
+        }
+        return null;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/PropertyUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/package.html?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/package.html (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/package.html Fri Nov 21 12:44:40 2008
@@ -0,0 +1,25 @@
+!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+utility classes
+
+</body>
+</html>
\ No newline at end of file

Added: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (added)
+++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Fri Nov 21 12:44:40 2008
@@ -0,0 +1,158 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.activeblaze.wire;
+
+option java_multiple_files = true;
+option optimize_for = SPEED;
+
+
+
+// We make use of the wonky comment style bellow because the following options
+// are not valid for protoc, but they are valid for the ActiveMQ proto compiler.
+// In the ActiveMQ proto compiler, comments terminate with the pipe character: |
+  
+enum MessageType {
+  //| option java_create_message="true";
+  BLAZE_DATA = 0;
+  MEMBER_DATA = 1;
+  ELECTION_MESSAGE = 2;
+}
+    message PacketData {   
+      optional int32 type =1;  
+	    optional bytes producerId = 2;
+	    optional bytes fromAddress =3;
+	    optional int64 sessionId = 4;
+      optional int64 messageSequence = 5;
+      optional bool reliable = 6;
+      optional int32 numberOfParts= 7;
+      optional int32 partNumber= 8;
+      optional bytes payload= 9;
+      optional bytes messageId =10;
+      optional bytes correlationId = 11;
+	  
+    }
+    message DestinationData {
+      required bool topic = 1;
+      required bytes destination = 2;
+    }
+    message MemberData {
+       //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
+       //| option java_type_method = "MessageType";
+       optional string id = 1;
+       optional string name = 2;
+       optional int64 startTime = 3;
+       optional int64 timeStamp = 4;
+       optional bytes inetAddress = 5;
+       optional int32 port = 6;
+       optional int64 coordinatorWeight = 7;
+       optional bool  destinationsChanged = 8;
+       repeated DestinationData  destination = 9; 
+    }
+    
+    enum ElectionType {
+    ELECTION = 0;
+    ANSWER = 1;
+    COORDINATOR = 2;
+  }
+    message ElectionMessage {
+    //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
+       //| option java_type_method = "MessageType";
+       optional MemberData member = 1;
+       optional ElectionType electionType = 2;
+    }
+   
+    
+    // Properties
+    message StringType {
+      optional string name = 1;
+      optional string value = 2;
+    }   
+    
+    message BoolType {
+      optional string name = 1;
+      optional bool value = 2;
+    }
+    
+    message ByteType {
+      optional string name = 1;
+      optional int32 value = 2;
+    }
+    
+    message ShortType {
+      optional string name = 1;
+      optional int32 value = 2;
+    }
+    
+    message IntType {
+      optional string name = 1;
+      optional int32 value = 2;
+    }
+    
+    message LongType {
+      optional string name = 1;
+      optional int64 value = 2;
+    }
+    
+    message FloatType {
+      optional string name = 1;
+      optional float value = 2;
+    }
+    
+    message DoubleType {
+      optional string name = 1;
+      optional double value = 2;
+    }
+
+    message CharType {
+	  optional string name = 1;
+	  optional string value = 2;
+	}
+	
+	message BytesType {
+	  optional string name = 1;
+	  optional bytes value = 2;
+	}
+	
+	
+    
+    message MapData {
+	  optional string name=1[default = "DEFAULT"];
+      repeated StringType stringType = 2;
+      repeated IntType intType = 3;
+      repeated BoolType boolType = 4;
+      repeated LongType longType = 5;
+      repeated DoubleType doubleType = 6;
+      repeated FloatType floatType = 7;
+      repeated ShortType shortType = 8;
+      repeated ByteType byteType = 9;
+ 	    repeated CharType charType = 10;
+	    repeated BytesType  bytesType = 11;
+      repeated MapData  mapType = 12;
+      
+    }
+    
+    message BlazeData {
+      //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
+    //| option java_type_method = "MessageType";
+      optional bool topic = 1 [default = true];
+      optional bytes Destination = 2[default = "test.topic"];    
+      optional int64 timestamp = 3;
+      optional MapData mapData = 4;
+    }
+
+	
+        
\ No newline at end of file

Added: activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/reliable/simple
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/reliable/simple?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/reliable/simple (added)
+++ activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/reliable/simple Fri Nov 21 12:44:40 2008
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activeblaze.impl.reliable.simple.SimpleReliableProcessor

Added: activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/transport/mcast
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/transport/mcast?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/transport/mcast (added)
+++ activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/transport/mcast Fri Nov 21 12:44:40 2008
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activeblaze.impl.transport.MulticastTransport

Added: activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/transport/multicast
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/transport/multicast?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/transport/multicast (added)
+++ activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/transport/multicast Fri Nov 21 12:44:40 2008
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activeblaze.impl.transport.MulticastTransport

Added: activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/transport/udp
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/transport/udp?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/transport/udp (added)
+++ activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/transport/udp Fri Nov 21 12:44:40 2008
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activeblaze.impl.transport.UdpTransport

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import junit.framework.TestCase;
+
+/**
+ * Basic test for BlazeChannel
+ * 
+ */
+public class BlazeChannelTest extends TestCase {
+    public void testChannel() throws Exception {
+        int count = 100;
+        final AtomicInteger received = new AtomicInteger();
+        String destination = "test.foo";
+        BlazeChannelFactory factory = new BlazeChannelFactory();
+        BlazeChannel sender = factory.createChannel();
+        BlazeChannel receiver = factory.createChannel();
+        receiver.getConfiguration().setUseDispatchThread(true);
+        sender.start();
+        receiver.start();
+        final CountDownLatch latch = new CountDownLatch(count);
+        receiver.addBlazeTopicMessageListener(destination, new BlazeTopicListener() {
+            public void onMessage(BlazeMessage message) {
+                message.size();
+                received.incrementAndGet();
+                latch.countDown();
+            }
+        });
+        BlazeMessage msg = new BlazeMessage();
+        msg.setText("value");
+        for (int i = 0; i < count; i++) {
+            sender.broadcast(destination, msg);
+        }
+        latch.await(10, TimeUnit.SECONDS);
+        receiver.stop();
+        sender.stop();
+        assertEquals("Not enough messages", 0, latch.getCount());
+    }
+
+    public void testGroupBroadcast() throws Exception {
+        final int number = 10;
+        String destination = "test.foo";
+        final AtomicInteger count = new AtomicInteger();
+        List<BlazeChannel> channels = new ArrayList<BlazeChannel>();
+        BlazeChannelFactory factory = new BlazeChannelFactory();
+        for (int i = 0; i < number; i++) {
+            BlazeChannel channel = factory.createChannel();
+            channel.start();
+            channels.add(channel);
+            channel.addBlazeTopicMessageListener(destination, new BlazeTopicListener() {
+                public void onMessage(BlazeMessage message) {
+                    synchronized (count) {
+                        if (count.incrementAndGet() == number) {
+                            count.notifyAll();
+                        }
+                    }
+                }
+            });
+        }
+        BlazeMessage msg = new BlazeMessage();
+        msg.setText("hello");
+        channels.get(0).broadcast(destination, msg);
+        synchronized (count) {
+            if (count.get() < number) {
+                count.wait(5000);
+            }
+        }
+       
+        assertEquals(number, count.get());
+        for (BlazeChannel channel : channels) {
+            channel.shutDown();
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeMessageTest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeMessageTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeMessageTest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.List;
+import junit.framework.TestCase;
+import org.apache.activeblaze.BlazeMessage;
+
+/**
+ * Test BlazeMessage
+ * 
+ */
+public class BlazeMessageTest extends TestCase {
+    private final String NAME = "testName";
+
+    public void testBytesConversion() throws Exception {
+        BlazeMessage msg = new BlazeMessage();
+        msg.setBoolean("boolean", true);
+        msg.setByte("byte", (byte) 1);
+        msg.setBytes("bytes", new byte[1]);
+        msg.setChar("char", 'a');
+        msg.setDouble("double", 1.5);
+        msg.setFloat("float", 1.5f);
+        msg.setInt("int", 1);
+        msg.setLong("long", 1);
+        msg.put("object", "stringObj");
+        msg.setShort("short", (short) 1);
+        msg.setString("string", "string");
+        // Test with a 1Meg String
+        StringBuffer bigSB = new StringBuffer(1024 * 1024);
+        for (int i = 0; i < 1024 * 1024; i++) {
+            bigSB.append((char) 'a' + i % 26);
+        }
+        String bigString = bigSB.toString();
+        msg.setString("bigString", bigString);
+        msg = msg.copy();
+        assertEquals(msg.getBoolean("boolean"), true);
+        assertEquals(msg.getByte("byte"), (byte) 1);
+        assertEquals(msg.getBytes("bytes").length, 1);
+        assertEquals(msg.getChar("char"), 'a');
+        assertEquals(msg.getDouble("double"), 1.5, 0);
+        assertEquals(msg.getFloat("float"), 1.5f, 0);
+        assertEquals(msg.getInt("int"), 1);
+        assertEquals(msg.getLong("long"), 1);
+        assertEquals(msg.getObject("object"), "stringObj");
+        assertEquals(msg.getShort("short"), (short) 1);
+        assertEquals(msg.getString("string"), "string");
+        assertEquals(msg.getString("bigString"), bigString);
+    }
+
+    public void testGetBoolean() throws Exception {
+        BlazeMessage msg = new BlazeMessage();
+        msg.setBoolean(NAME, true);
+        assertTrue(msg.getBoolean(NAME));
+        msg.clear();
+        msg.setString(NAME, "true");
+        msg = msg.copy();
+        assertTrue(msg.getBoolean(NAME));
+    }
+
+    public void testGetByte() throws Exception {
+        BlazeMessage msg = new BlazeMessage();
+        msg.setByte(this.NAME, (byte) 1);
+        msg = msg.copy();
+        assertTrue(msg.getByte(this.NAME) == (byte) 1);
+    }
+
+    public void testGetShort() throws Exception {
+        BlazeMessage msg = new BlazeMessage();
+        msg.setShort(this.NAME, (short) 1);
+        msg = msg.copy();
+        assertTrue(msg.getShort(this.NAME) == (short) 1);
+    }
+
+    public void testGetChar() throws Exception {
+        BlazeMessage msg = new BlazeMessage();
+        msg.setChar(this.NAME, 'a');
+        msg = msg.copy();
+        assertTrue(msg.getChar(this.NAME) == 'a');
+    }
+
+    public void testGetInt() throws Exception {
+        BlazeMessage msg = new BlazeMessage();
+        msg.setInt(this.NAME, 1);
+        msg = msg.copy();
+        assertTrue(msg.getInt(this.NAME) == 1);
+    }
+
+    public void testGetLong() throws Exception {
+        BlazeMessage msg = new BlazeMessage();
+        msg.setLong(this.NAME, 1);
+        msg = msg.copy();
+        assertTrue(msg.getLong(this.NAME) == 1);
+    }
+
+    public void testGetFloat() throws Exception {
+        BlazeMessage msg = new BlazeMessage();
+        msg.setFloat(this.NAME, 1.5f);
+        msg = msg.copy();
+        assertTrue(msg.getFloat(this.NAME) == 1.5f);
+    }
+
+    public void testGetDouble() throws Exception {
+        BlazeMessage msg = new BlazeMessage();
+        msg.setDouble(this.NAME, 1.5);
+        msg = msg.copy();
+        assertTrue(msg.getDouble(this.NAME) == 1.5);
+    }
+
+    public void testGetString() throws Exception {
+        BlazeMessage msg = new BlazeMessage();
+        String str = "test";
+        msg.setString(this.NAME, str);
+        msg = msg.copy();
+        assertEquals(msg.getString(this.NAME), str);
+    }
+
+    public void testGetBytes() throws Exception {
+        BlazeMessage msg = new BlazeMessage();
+        byte[] bytes1 = new byte[3];
+        byte[] bytes2 = new byte[2];
+        System.arraycopy(bytes1, 0, bytes2, 0, 2);
+        msg.setBytes(this.NAME, bytes1);
+        msg.setBytes(this.NAME + "2", bytes1, 0, 2);
+        msg = msg.copy();
+        assertTrue(Arrays.equals(msg.getBytes(this.NAME), bytes1));
+        assertEquals(msg.getBytes(this.NAME + "2").length, bytes2.length);
+    }
+
+    public void testGetObject() throws Exception {
+        BlazeMessage msg = new BlazeMessage();
+        Boolean booleanValue = Boolean.TRUE;
+        Byte byteValue = Byte.valueOf("1");
+        byte[] bytesValue = new byte[3];
+        Character charValue = new Character('a');
+        Double doubleValue = Double.valueOf("1.5");
+        Float floatValue = Float.valueOf("1.5");
+        Integer intValue = Integer.valueOf("1");
+        Long longValue = Long.valueOf("1");
+        Short shortValue = Short.valueOf("1");
+        String stringValue = "string";
+        msg.put("boolean", booleanValue);
+        msg.put("byte", byteValue);
+        msg.put("bytes", bytesValue);
+        msg.put("char", charValue);
+        msg.put("double", doubleValue);
+        msg.put("float", floatValue);
+        msg.put("int", intValue);
+        msg.put("long", longValue);
+        msg.put("short", shortValue);
+        msg.put("string", stringValue);
+        msg = msg.copy();
+        assertTrue(msg.getObject("boolean") instanceof Boolean);
+        assertEquals(msg.getObject("boolean"), booleanValue);
+        assertEquals(msg.getBoolean("boolean"), booleanValue.booleanValue());
+        assertTrue(msg.getObject("byte") instanceof Byte);
+        assertEquals(msg.getObject("byte"), byteValue);
+        assertEquals(msg.getByte("byte"), byteValue.byteValue());
+        assertTrue(msg.getObject("bytes") instanceof byte[]);
+        assertEquals(((byte[]) msg.getObject("bytes")).length,
+                bytesValue.length);
+        assertEquals(msg.getBytes("bytes").length, bytesValue.length);
+        assertTrue(msg.getObject("char") instanceof Character);
+        assertEquals(msg.getObject("char"), charValue);
+        assertEquals(msg.getChar("char"), charValue.charValue());
+        assertTrue(msg.getObject("double") instanceof Double);
+        assertEquals(msg.getObject("double"), doubleValue);
+        assertEquals(msg.getDouble("double"), doubleValue.doubleValue(), 0);
+        assertTrue(msg.getObject("float") instanceof Float);
+        assertEquals(msg.getObject("float"), floatValue);
+        assertEquals(msg.getFloat("float"), floatValue.floatValue(), 0);
+        assertTrue(msg.getObject("int") instanceof Integer);
+        assertEquals(msg.getObject("int"), intValue);
+        assertEquals(msg.getInt("int"), intValue.intValue());
+        assertTrue(msg.getObject("long") instanceof Long);
+        assertEquals(msg.getObject("long"), longValue);
+        assertEquals(msg.getLong("long"), longValue.longValue());
+        assertTrue(msg.getObject("short") instanceof Short);
+        assertEquals(msg.getObject("short"), shortValue);
+        assertEquals(msg.getShort("short"), shortValue.shortValue());
+        assertTrue(msg.getObject("string") instanceof String);
+        assertEquals(msg.getObject("string"), stringValue);
+        assertEquals(msg.getString("string"), stringValue);
+    }
+
+    public void testGetMapNames() throws Exception {
+        BlazeMessage msg = new BlazeMessage();
+        msg.setBoolean("boolean", true);
+        msg.setByte("byte", (byte) 1);
+        msg.setBytes("bytes1", new byte[1]);
+        msg.setBytes("bytes2", new byte[3], 0, 2);
+        msg.setChar("char", 'a');
+        msg.setDouble("double", 1.5);
+        msg.setFloat("float", 1.5f);
+        msg.setInt("int", 1);
+        msg.setLong("long", 1);
+        msg.put("object", "stringObj");
+        msg.setShort("short", (short) 1);
+        msg.setString("string", "string");
+        msg = msg.copy();
+        Enumeration<String> mapNamesEnum = msg.getMapNames();
+        List<String> mapNamesList = Collections.list(mapNamesEnum);
+        assertEquals(mapNamesList.size(), 12);
+        assertTrue(mapNamesList.contains("boolean"));
+        assertTrue(mapNamesList.contains("byte"));
+        assertTrue(mapNamesList.contains("bytes1"));
+        assertTrue(mapNamesList.contains("bytes2"));
+        assertTrue(mapNamesList.contains("char"));
+        assertTrue(mapNamesList.contains("double"));
+        assertTrue(mapNamesList.contains("float"));
+        assertTrue(mapNamesList.contains("int"));
+        assertTrue(mapNamesList.contains("long"));
+        assertTrue(mapNamesList.contains("object"));
+        assertTrue(mapNamesList.contains("short"));
+        assertTrue(mapNamesList.contains("string"));
+    }
+
+    public void testItemExists() throws Exception {
+        BlazeMessage msg = new BlazeMessage();
+        msg.setString("exists", "test");
+        msg = msg.copy();
+        assertTrue(msg.containsKey("exists"));
+        assertFalse(msg.containsKey("doesntExist"));
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message