activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961122 [2/3] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/resources/ activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/jaxb/ activem...
Date Wed, 07 Jul 2010 04:03:36 GMT
Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java?rev=961122&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java Wed Jul  7 04:03:34 2010
@@ -0,0 +1,424 @@
+package org.apache.activemq.broker.store.hawtdb.store;
+
+import org.apache.activemq.apollo.store.*;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdb.api.Transaction;
+import org.fusesource.hawtdb.internal.journal.Location;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+* Created by IntelliJ IDEA.
+* User: chirino
+* Date: May 19, 2010
+* Time: 4:51:30 PM
+* To change this template use File | Settings | File Templates.
+*/
+class HawtDBSession {
+
+    Data.Type.TypeCreatable atomicUpdate = null;
+    int updateCount = 0;
+
+    private Transaction tx;
+    private HawtDBManager store;
+
+    public HawtDBSession(HawtDBManager store) {
+        this.store = store;
+    }
+
+    private Transaction tx() {
+        acquireLock();
+        return tx;
+    }
+
+    public final void commit() {
+        commit(null);
+    }
+
+    public final void rollback() {
+        try {
+            if (tx != null) {
+                if (updateCount > 1) {
+                    store.journal.write(HawtDBManager.CANCEL_UNIT_OF_WORK_DATA, false);
+                }
+                tx.rollback();
+            } else {
+                throw new IllegalStateException("Not in Transaction");
+            }
+        } catch (IOException e) {
+            throw new FatalStoreException(e);
+        } finally {
+            if (tx != null) {
+                tx = null;
+                updateCount = 0;
+                atomicUpdate = null;
+            }
+        }
+    }
+
+    /**
+     * Indicates callers intent to start a transaction.
+     */
+    public final void acquireLock() {
+        if (tx == null) {
+            store.indexLock.writeLock().lock();
+            tx = store.pageFile.tx();
+        }
+    }
+
+    public final void releaseLock() {
+        try {
+            if (tx != null) {
+                rollback();
+            }
+        } finally {
+            store.indexLock.writeLock().unlock();
+        }
+    }
+
+    public void commit(Runnable onFlush) {
+        try {
+
+            boolean flush = false;
+            if (atomicUpdate != null) {
+                store.store(atomicUpdate, onFlush, tx);
+            } else if (updateCount > 1) {
+                store.journal.write(HawtDBManager.END_UNIT_OF_WORK_DATA, onFlush);
+            } else {
+                flush = onFlush != null;
+            }
+
+            if (tx != null) {
+                tx.commit();
+            }
+
+            if (flush) {
+                onFlush.run();
+            }
+
+        } catch (IOException e) {
+            throw new FatalStoreException(e);
+        } finally {
+            tx = null;
+            updateCount = 0;
+            atomicUpdate = null;
+        }
+    }
+
+    private void storeAtomic() {
+        if (atomicUpdate != null) {
+            try {
+                store.journal.write(HawtDBManager.BEGIN_UNIT_OF_WORK_DATA, false);
+                store.store(atomicUpdate, null, tx);
+                atomicUpdate = null;
+            } catch (IOException ioe) {
+                throw new FatalStoreException(ioe);
+            }
+        }
+    }
+
+    private void addUpdate(Data.Type.TypeCreatable bean) {
+        try {
+            //As soon as we do more than one update we'll wrap in a unit of
+            //work:
+            if (updateCount == 0) {
+                atomicUpdate = bean;
+                updateCount++;
+                return;
+            }
+            storeAtomic();
+
+            updateCount++;
+            store.store(bean, null, tx);
+
+        } catch (IOException ioe) {
+            throw new FatalStoreException(ioe);
+        }
+    }
+
+    // /////////////////////////////////////////////////////////////
+    // Message related methods.
+    // /////////////////////////////////////////////////////////////
+
+    public void messageAdd(MessageRecord message) {
+        if (message.id < 0) {
+            throw new IllegalArgumentException("Key not set");
+        }
+        Data.MessageAdd.MessageAddBean bean = new Data.MessageAdd.MessageAddBean();
+        bean.setMessageKey(message.id);
+        bean.setMessageId(message.messageId);
+        bean.setProtocol(message.protocol);
+        bean.setMessageSize(message.size);
+        Buffer buffer = message.value;
+        if (buffer != null) {
+            bean.setValue(buffer);
+        }
+        Long streamKey = message.stream;
+        if (streamKey != null) {
+            bean.setStreamKey(streamKey);
+        }
+
+        addUpdate(bean);
+    }
+
+    public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException {
+        storeAtomic();
+        Location location = store.rootEntity.messageGetLocation(tx(), key);
+        if (location == null) {
+            throw new KeyNotFoundException("message key: " + key);
+        }
+        try {
+            Data.MessageAdd bean = (Data.MessageAdd) store.load(location);
+            MessageRecord rc = new MessageRecord();
+            rc.id = bean.getMessageKey();
+            rc.messageId = bean.getMessageId();
+            rc.protocol = bean.getProtocol();
+            rc.size = bean.getMessageSize();
+            if (bean.hasValue()) {
+                rc.value = bean.getValue();
+            }
+            if (bean.hasStreamKey()) {
+                rc.stream = bean.getStreamKey();
+            }
+            return rc;
+        } catch (IOException e) {
+            throw new FatalStoreException(e);
+        }
+    }
+
+    // /////////////////////////////////////////////////////////////
+    // Queue related methods.
+    // /////////////////////////////////////////////////////////////
+    public void queueAdd(QueueRecord record) {
+        Data.QueueAdd.QueueAddBean update = new Data.QueueAdd.QueueAddBean();
+        update.setName(record.name);
+        update.setQueueType(record.queueType);
+//        AsciiBuffer parent = record.getParent();
+//        if (parent != null) {
+//            update.setParentName(parent);
+//            update.setPartitionId(record.getPartitionKey());
+//        }
+        addUpdate(update);
+    }
+
+    public void queueRemove(QueueRecord record) {
+        addUpdate(new Data.QueueRemove.QueueRemoveBean().setKey(record.id));
+    }
+
+    public Iterator<QueueStatus> queueListByType(AsciiBuffer type, QueueRecord firstQueue, int max) {
+        storeAtomic();
+        try {
+            return store.rootEntity.queueList(tx(), type, firstQueue, max);
+        } catch (IOException e) {
+            throw new FatalStoreException(e);
+        }
+    }
+
+    public Iterator<QueueStatus> queueList(QueueRecord firstQueue, int max) {
+        storeAtomic();
+        try {
+            return store.rootEntity.queueList(tx(), null, firstQueue, max);
+        } catch (IOException e) {
+            throw new FatalStoreException(e);
+        }
+    }
+
+    public void queueAddMessage(QueueRecord queue, QueueEntryRecord entryRecord) throws KeyNotFoundException {
+        Data.QueueAddMessage.QueueAddMessageBean bean = new Data.QueueAddMessage.QueueAddMessageBean();
+        bean.setQueueKey(queue.id);
+        bean.setQueueKey(entryRecord.queueKey);
+        bean.setMessageKey(entryRecord.messageKey);
+        bean.setMessageSize(entryRecord.size);
+        if (entryRecord.attachment != null) {
+            bean.setAttachment(entryRecord.attachment);
+        }
+        addUpdate(bean);
+    }
+
+    public void queueRemoveMessage(QueueRecord queue, Long queueKey) throws KeyNotFoundException {
+        Data.QueueRemoveMessage.QueueRemoveMessageBean bean = new Data.QueueRemoveMessage.QueueRemoveMessageBean();
+        bean.setQueueKey(queueKey);
+        bean.setQueueName(queue.name);
+        addUpdate(bean);
+    }
+
+    public Iterator<QueueEntryRecord> queueListMessagesQueue(QueueRecord queue, Long firstQueueKey, Long maxQueueKey, int max) throws KeyNotFoundException {
+        storeAtomic();
+        DestinationEntity destination = store.rootEntity.getDestination(queue.id);
+        if (destination == null) {
+            throw new KeyNotFoundException("queue key: " + queue);
+        }
+        try {
+            return destination.listMessages(tx(), firstQueueKey, maxQueueKey, max);
+        } catch (IOException e) {
+            throw new FatalStoreException(e);
+        }
+    }
+
+    ////////////////////////////////////////////////////////////////
+    //Client related methods
+    ////////////////////////////////////////////////////////////////
+
+    /**
+     * Adds a subscription to the store.
+     *
+     * @throws DuplicateKeyException
+     *             if a subscription with the same name already exists
+     *
+     */
+    public void addSubscription(SubscriptionRecord record) throws DuplicateKeyException {
+        storeAtomic();
+        SubscriptionRecord old;
+        try {
+            old = store.rootEntity.getSubscription(record.name);
+            if (old != null && !old.equals(record)) {
+                throw new DuplicateKeyException("Subscription already exists: " + record.name);
+            } else {
+                updateSubscription(record);
+            }
+        } catch (IOException e) {
+            throw new FatalStoreException(e);
+        }
+    }
+
+    /**
+     * Updates a subscription in the store. If the subscription does not
+     * exist then it will simply be added.
+     */
+    public void updateSubscription(SubscriptionRecord record) {
+        Data.SubscriptionAdd.SubscriptionAddBean update = new Data.SubscriptionAdd.SubscriptionAddBean();
+        update.setName(record.name);
+        update.setDestination(record.destination);
+        update.setDurable(record.isDurable);
+
+        if (record.attachment != null) {
+            update.setAttachment(record.attachment);
+        }
+        if (record.selector != null) {
+            update.setSelector(record.selector);
+        }
+        if (record.expiration != -1) {
+            update.setTte(record.expiration);
+        }
+        addUpdate(update);
+    }
+
+    /**
+     * Removes a subscription with the given name from the store.
+     */
+    public void removeSubscription(AsciiBuffer name) {
+        Data.SubscriptionRemove.SubscriptionRemoveBean update = new Data.SubscriptionRemove.SubscriptionRemoveBean();
+        update.setName(name);
+        addUpdate(update);
+    }
+
+    /**
+     * @return A list of subscriptions
+     */
+    public Iterator<SubscriptionRecord> listSubscriptions() {
+        storeAtomic();
+        try {
+            return store.rootEntity.listSubsriptions(tx);
+        } catch (IOException e) {
+            throw new FatalStoreException(e);
+        }
+    }
+
+    // /////////////////////////////////////////////////////////////
+    // Map related methods.
+    // /////////////////////////////////////////////////////////////
+    public void mapAdd(AsciiBuffer map) {
+        Data.MapAdd.MapAddBean update = new Data.MapAdd.MapAddBean();
+        update.setMapName(map);
+        addUpdate(update);
+    }
+
+    public void mapRemove(AsciiBuffer map) {
+        Data.MapRemove.MapRemoveBean update = new Data.MapRemove.MapRemoveBean();
+        update.setMapName(map);
+        addUpdate(update);
+    }
+
+    public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
+        storeAtomic();
+        return store.rootEntity.mapList(first, max, tx);
+    }
+
+    public void mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) {
+        Data.MapEntryPut.MapEntryPutBean update = new Data.MapEntryPut.MapEntryPutBean();
+        update.setMapName(map);
+        update.setId(key);
+        update.setValue(value);
+        addUpdate(update);
+    }
+
+    public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
+        storeAtomic();
+        try {
+            return store.rootEntity.mapGetEntry(map, key, tx);
+        } catch (IOException e) {
+            throw new FatalStoreException(e);
+        }
+    }
+
+    public void mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
+        Data.MapEntryRemove.MapEntryRemoveBean update = new Data.MapEntryRemove.MapEntryRemoveBean();
+        update.setMapName(map);
+        update.setId(key);
+        addUpdate(update);
+    }
+
+    public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException {
+        storeAtomic();
+        try {
+            return store.rootEntity.mapListKeys(map, first, max, tx);
+        } catch (IOException e) {
+            throw new FatalStoreException(e);
+        }
+    }
+
+    // /////////////////////////////////////////////////////////////
+    // Stream related methods.
+    // /////////////////////////////////////////////////////////////
+    public Long streamOpen() {
+        return null;
+    }
+
+    public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException {
+    }
+
+    public void streamClose(Long streamKey) throws KeyNotFoundException {
+    }
+
+    public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException {
+        return null;
+    }
+
+    public boolean streamRemove(Long streamKey) {
+        return false;
+    }
+
+    // /////////////////////////////////////////////////////////////
+    // Transaction related methods.
+    // /////////////////////////////////////////////////////////////
+    public void transactionAdd(Buffer txid) {
+    }
+
+    public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException {
+    }
+
+    public void transactionCommit(Buffer txid) throws KeyNotFoundException {
+    }
+
+    public Iterator<Buffer> transactionList(Buffer first, int max) {
+        return null;
+    }
+
+    public void transactionRemoveMessage(Buffer txid, QueueRecord queueName, Long messageKey) throws KeyNotFoundException {
+    }
+
+    public void transactionRollback(Buffer txid) throws KeyNotFoundException {
+    }
+}

Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/KeyNotFoundException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/KeyNotFoundException.java?rev=961122&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/KeyNotFoundException.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/KeyNotFoundException.java Wed Jul  7 04:03:34 2010
@@ -0,0 +1,28 @@
+package org.apache.activemq.broker.store.hawtdb.store;
+
+/**
+* Created by IntelliJ IDEA.
+* User: chirino
+* Date: May 19, 2010
+* Time: 4:49:45 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class KeyNotFoundException extends Exception {
+    private static final long serialVersionUID = -2570252319033659546L;
+
+    public KeyNotFoundException() {
+        super();
+    }
+
+    public KeyNotFoundException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public KeyNotFoundException(String message) {
+        super(message);
+    }
+
+    public KeyNotFoundException(Throwable cause) {
+        super(cause);
+    }
+}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Marshallers.java (from r961121, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Marshallers.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Marshallers.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Marshallers.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Marshallers.java&r1=961121&r2=961122&rev=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/Marshallers.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/Marshallers.java Wed Jul  7 04:03:34 2010
@@ -14,14 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.store.hawtdb;
+package org.apache.activemq.broker.store.hawtdb.store;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.activemq.broker.store.QueueDescriptor;
-import org.apache.activemq.broker.store.Store.QueueRecord;
+import org.apache.activemq.apollo.store.QueueRecord;
+import org.apache.activemq.apollo.store.QueueEntryRecord;
 import org.fusesource.hawtbuf.AsciiBuffer;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtdb.util.marshaller.Marshaller;
@@ -29,74 +29,72 @@ import org.fusesource.hawtdb.util.marsha
 
 public class Marshallers {
 
-    public final static Marshaller<QueueRecord> QUEUE_RECORD_MARSHALLER = new VariableMarshaller<QueueRecord>() {
+    public final static Marshaller<QueueEntryRecord> QUEUE_RECORD_MARSHALLER = new VariableMarshaller<QueueEntryRecord>() {
 
-        public QueueRecord readPayload(DataInput dataIn) throws IOException {
-            QueueRecord rc = new QueueRecord();
-            rc.setQueueKey(dataIn.readLong());
-            rc.setMessageKey(dataIn.readLong());
-            rc.setSize(dataIn.readInt());
-            if (dataIn.readBoolean()) {
-                rc.setTte(dataIn.readLong());
-            }
-            rc.setRedelivered(dataIn.readBoolean());
+        public QueueEntryRecord readPayload(DataInput dataIn) throws IOException {
+            QueueEntryRecord rc = new QueueEntryRecord();
+            rc.queueKey = dataIn.readLong();
+            rc.messageKey = dataIn.readLong();
+            rc.size = dataIn.readInt();
+//            if (dataIn.readBoolean()) {
+//                rc.setTte(dataIn.readLong());
+//            }
+            rc.redeliveries = dataIn.readShort();
             if (dataIn.readBoolean()) {
-                rc.setAttachment(BUFFER_MARSHALLER.readPayload(dataIn));
+                rc.attachment = BUFFER_MARSHALLER.readPayload(dataIn);
             }
             return rc;
         }
 
-        public void writePayload(QueueRecord object, DataOutput dataOut) throws IOException {
-            dataOut.writeLong(object.getQueueKey());
-            dataOut.writeLong(object.getMessageKey());
-            dataOut.writeInt(object.getSize());
-            if (object.getTte() >= 0) {
+        public void writePayload(QueueEntryRecord object, DataOutput dataOut) throws IOException {
+            dataOut.writeLong(object.queueKey);
+            dataOut.writeLong(object.messageKey);
+            dataOut.writeInt(object.size);
+//            if (object.getTte() >= 0) {
+//                dataOut.writeBoolean(true);
+//                dataOut.writeLong(object.getTte());
+//            } else {
+//                dataOut.writeBoolean(false);
+//            }
+            dataOut.writeShort(object.redeliveries);
+            if (object.attachment != null) {
                 dataOut.writeBoolean(true);
-                dataOut.writeLong(object.getTte());
-            } else {
-                dataOut.writeBoolean(false);
-            }
-            dataOut.writeBoolean(object.isRedelivered());
-            if (object.getAttachment() != null) {
-                dataOut.writeBoolean(true);
-                BUFFER_MARSHALLER.writePayload(object.getAttachment(), dataOut);
+                BUFFER_MARSHALLER.writePayload(object.attachment, dataOut);
             } else {
                 dataOut.writeBoolean(false);
             }
         }
 
-        public int estimatedSize(QueueRecord object) {
+        public int estimatedSize(QueueEntryRecord object) {
             throw new UnsupportedOperationException();
         }
     };
 
-    public final static Marshaller<QueueDescriptor> QUEUE_DESCRIPTOR_MARSHALLER = new VariableMarshaller<QueueDescriptor>() {
+    public final static Marshaller<QueueRecord> QUEUE_DESCRIPTOR_MARSHALLER = new VariableMarshaller<QueueRecord>() {
 
-        public QueueDescriptor readPayload(DataInput dataIn) throws IOException {
-            QueueDescriptor descriptor = new QueueDescriptor();
-            descriptor.setQueueType(dataIn.readShort());
-            descriptor.setApplicationType(dataIn.readShort());
-            descriptor.setQueueName(ASCII_BUFFER_MARSHALLER.readPayload(dataIn));
-            if (dataIn.readBoolean()) {
-                descriptor.setParent(ASCII_BUFFER_MARSHALLER.readPayload(dataIn));
-                descriptor.setPartitionId(dataIn.readInt());
-            }
-            return descriptor;
+        public QueueRecord readPayload(DataInput dataIn) throws IOException {
+            QueueRecord record = new QueueRecord();
+            record.queueType = ASCII_BUFFER_MARSHALLER.readPayload(dataIn);
+            record.name = ASCII_BUFFER_MARSHALLER.readPayload(dataIn);
+//            if (dataIn.readBoolean()) {
+//                record.parent = ASCII_BUFFER_MARSHALLER.readPayload(dataIn)
+//                record.setPartitionId(dataIn.readInt());
+//            }
+            return record;
         }
 
-        public void writePayload(QueueDescriptor object, DataOutput dataOut) throws IOException {
-            dataOut.writeShort(object.getQueueType());
-            dataOut.writeShort(object.getApplicationType());
-            ASCII_BUFFER_MARSHALLER.writePayload(object.getQueueName(), dataOut);
-            if (object.getParent() != null) {
-                dataOut.writeBoolean(true);
-                ASCII_BUFFER_MARSHALLER.writePayload(object.getParent(), dataOut);
-                dataOut.writeInt(object.getPartitionKey());
-            } else {
-                dataOut.writeBoolean(false);
-            }
+        public void writePayload(QueueRecord object, DataOutput dataOut) throws IOException {
+            ASCII_BUFFER_MARSHALLER.writePayload(object.queueType, dataOut);
+            ASCII_BUFFER_MARSHALLER.writePayload(object.name, dataOut);
+//            if (object.parent != null) {
+//                dataOut.writeBoolean(true);
+//                ASCII_BUFFER_MARSHALLER.writePayload(object.parent, dataOut);
+//                dataOut.writeInt(object.getPartitionKey());
+//            } else {
+//                dataOut.writeBoolean(false);
+//            }
         }
-        public int estimatedSize(QueueDescriptor object) {
+        public int estimatedSize(QueueRecord object) {
             throw new UnsupportedOperationException();
         }
     };

Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java (from r961121, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/MessageKeys.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/MessageKeys.java&r1=961121&r2=961122&rev=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/MessageKeys.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java Wed Jul  7 04:03:34 2010
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.store.hawtdb;
+package org.apache.activemq.broker.store.hawtdb.store;
 
 import java.io.DataInput;
 import java.io.DataOutput;

Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java (from r961121, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.java&r1=961121&r2=961122&rev=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/RootEntity.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java Wed Jul  7 04:03:34 2010
@@ -14,27 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.store.hawtdb;
+package org.apache.activemq.broker.store.hawtdb.store;
 
-import java.io.*;
-import java.util.*;
-import java.util.Map.Entry;
-
-import org.apache.activemq.broker.store.QueueDescriptor;
-import org.apache.activemq.broker.store.Store;
-import org.apache.activemq.broker.store.Store.KeyNotFoundException;
-import org.apache.activemq.broker.store.Store.QueueQueryResult;
-import org.apache.activemq.broker.store.Store.SubscriptionRecord;
-import org.apache.activemq.broker.store.hawtdb.Data.MessageAdd;
-import org.apache.activemq.broker.store.hawtdb.Data.SubscriptionAdd;
-import org.apache.activemq.broker.store.hawtdb.Data.SubscriptionAdd.SubscriptionAddBuffer;
+import org.apache.activemq.apollo.store.QueueRecord;
+import org.apache.activemq.apollo.store.QueueStatus;
+import org.apache.activemq.apollo.store.SubscriptionRecord;
+import org.apache.activemq.broker.store.hawtdb.store.Data.MessageAdd;
+import org.apache.activemq.broker.store.hawtdb.store.Data.SubscriptionAdd;
+import org.apache.activemq.broker.store.hawtdb.store.Data.SubscriptionAdd.SubscriptionAddBuffer;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.proto.InvalidProtocolBufferException;
 import org.fusesource.hawtdb.api.*;
 import org.fusesource.hawtdb.internal.journal.Location;
-import org.fusesource.hawtbuf.*;
-import org.fusesource.hawtdb.util.marshaller.LongMarshaller;
 import org.fusesource.hawtdb.util.marshaller.IntegerMarshaller;
 import org.fusesource.hawtdb.util.marshaller.LocationMarshaller;
+import org.fusesource.hawtdb.util.marshaller.LongMarshaller;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
 
 public class RootEntity {
 
@@ -46,7 +47,7 @@ public class RootEntity {
     private static final BTreeIndexFactory<Long, Location> messageKeyIndexFactory = new BTreeIndexFactory<Long, Location>();
     private static final BTreeIndexFactory<Integer, Long> locationIndexFactory = new BTreeIndexFactory<Integer, Long>();
     private static final BTreeIndexFactory<Long, Long> messageRefsIndexFactory = new BTreeIndexFactory<Long, Long>();
-    private static final BTreeIndexFactory<AsciiBuffer, DestinationEntity> destinationIndexFactory = new BTreeIndexFactory<AsciiBuffer, DestinationEntity>();
+    private static final BTreeIndexFactory<Long, DestinationEntity> destinationIndexFactory = new BTreeIndexFactory<Long, DestinationEntity>();
     private static final BTreeIndexFactory<AsciiBuffer, Buffer> subscriptionIndexFactory = new BTreeIndexFactory<AsciiBuffer, Buffer>();
     private static final BTreeIndexFactory<AsciiBuffer, Integer> mapIndexFactory = new BTreeIndexFactory<AsciiBuffer, Integer>();
     private static final BTreeIndexFactory<AsciiBuffer, Buffer> mapInstanceIndexFactory = new BTreeIndexFactory<AsciiBuffer, Buffer>();
@@ -64,7 +65,7 @@ public class RootEntity {
         messageRefsIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
         messageRefsIndexFactory.setDeferredEncoding(true);
 
-        destinationIndexFactory.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+        destinationIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
         destinationIndexFactory.setValueMarshaller(DestinationEntity.MARSHALLER);
         destinationIndexFactory.setDeferredEncoding(true);
 
@@ -96,7 +97,7 @@ public class RootEntity {
         // count:
 
         // The destinations
-        private SortedIndex<AsciiBuffer, DestinationEntity> destinationIndex;
+        private SortedIndex<Long, DestinationEntity> destinationIndex;
 
         // Subscriptions
         private SortedIndex<AsciiBuffer, Buffer> subscriptionIndex;
@@ -105,7 +106,7 @@ public class RootEntity {
         private SortedIndex<AsciiBuffer, Integer> mapIndex;
 
         public void create(Transaction tx) {
-            state = HawtDBStore.CLOSED_STATE;
+            state = HawtDBManager.CLOSED_STATE;
             messageKeyIndex = messageKeyIndexFactory.create(tx, tx.alloc());
             if (USE_LOC_INDEX)
                 locationIndex = locationIndexFactory.create(tx, tx.alloc());
@@ -186,34 +187,6 @@ public class RootEntity {
             }
         }
 
-        // Build up the queue partition hierarchy:
-        try {
-            constructQueueHierarchy();
-        } catch (KeyNotFoundException e) {
-            IOException ioe = new IOException("Inconsistent store");
-            ioe.initCause(e);
-            throw ioe;
-        }
-    }
-
-    /**
-     * Constructs the mapping of parent queues to child queues.
-     * 
-     * @throws KeyNotFoundException
-     */
-    private void constructQueueHierarchy() throws KeyNotFoundException {
-        for (Entry<AsciiBuffer, DestinationEntity> entry : data.destinationIndex) {
-            DestinationEntity destination = entry.getValue();
-            QueueDescriptor queue = destination.getDescriptor();
-            if (queue.getParent() != null) {
-                DestinationEntity parent = data.destinationIndex.get(queue.getParent());
-                if (parent == null) {
-                    throw new KeyNotFoundException("Parent queue for " + queue.getQueueName() + " not found");
-                } else {
-                    parent.addPartition(destination);
-                }
-            }
-        }
     }
 
     @Deprecated // TODO: keep data immutable
@@ -270,7 +243,7 @@ public class RootEntity {
         return data.messageKeyIndex.get(messageKey);
     }
 
-    public void addMessageRef(Transaction tx, AsciiBuffer queueName, Long messageKey) {
+    public void addMessageRef(Transaction tx, Long messageKey) {
         try {
             Long refs = data.messageRefsIndex.get(messageKey);
             if (refs == null) {
@@ -279,12 +252,12 @@ public class RootEntity {
                 data.messageRefsIndex.put(messageKey, new Long(1 + refs.longValue()));
             }
         } catch (RuntimeException e) {
-            throw new Store.FatalStoreException(e);
+            throw new FatalStoreException(e);
         }
 
     }
 
-    public void removeMessageRef(Transaction tx, AsciiBuffer queueName, Long messageKey) {
+    public void removeMessageRef(Transaction tx, Long messageKey) {
         try {
             Long refs = data.messageRefsIndex.get(messageKey);
             if (refs != null) {
@@ -297,7 +270,7 @@ public class RootEntity {
                 }
             }
         } catch (RuntimeException e) {
-            throw new Store.FatalStoreException(e);
+            throw new FatalStoreException(e);
         }
     }
 
@@ -307,7 +280,7 @@ public class RootEntity {
 
     /**
      * Returns a list of all of the stored subscriptions.
-     * 
+     *
      * @param tx
      *            The transaction under which this is to be executed.
      * @return a list of all of the stored subscriptions.
@@ -327,7 +300,7 @@ public class RootEntity {
                     try {
                         rc.add(toSubscriptionRecord(b));
                     } catch (InvalidProtocolBufferException e) {
-                        throw new Store.FatalStoreException(e);
+                        throw new FatalStoreException(e);
                     }
                 }
             }
@@ -366,7 +339,7 @@ public class RootEntity {
 
     /**
      * Converts a Subscription buffer to a SubscriptionRecord.
-     * 
+     *
      * @param b
      *            The buffer
      * @return The record.
@@ -382,15 +355,15 @@ public class RootEntity {
             SubscriptionAddBuffer sab = SubscriptionAddBuffer.parseFramed(b);
             if (sab != null) {
                 rc = new SubscriptionRecord();
-                rc.setName(sab.getName());
-                rc.setDestination(sab.getDestination());
-                rc.setIsDurable(sab.getDurable());
+                rc.name = sab.getName();
+                rc.destination = sab.getDestination();
+                rc.isDurable = sab.getDurable();
                 if (sab.hasAttachment())
-                    rc.setAttachment(sab.getAttachment());
+                    rc.attachment = sab.getAttachment();
                 if (sab.hasSelector())
-                    rc.setSelector(sab.getSelector());
+                    rc.selector = sab.getSelector();
                 if (sab.hasTte())
-                    rc.setTte(sab.getTte());
+                    rc.expiration = sab.getTte();
 
             }
         }
@@ -400,68 +373,70 @@ public class RootEntity {
     // /////////////////////////////////////////////////////////////////
     // Queue Methods.
     // /////////////////////////////////////////////////////////////////
-    public void queueAdd(Transaction tx, QueueDescriptor queue) throws IOException {
-        if (data.destinationIndex.get(queue.getQueueName()) == null) {
+    public void queueAdd(Transaction tx, QueueRecord queue) throws IOException {
+        if (data.destinationIndex.get(queue.id) == null) {
             DestinationEntity rc = new DestinationEntity();
             rc.setQueueDescriptor(queue);
             rc.allocate(tx);
-            data.destinationIndex.put(queue.getQueueName(), rc);
+            data.destinationIndex.put(queue.id, rc);
         }
     }
 
-    public void queueRemove(Transaction tx, QueueDescriptor queue) throws IOException {
-        DestinationEntity destination = data.destinationIndex.get(queue.getQueueName());
+    public void queueRemove(Transaction tx, Long queueKey) throws IOException {
+        DestinationEntity destination = data.destinationIndex.get(queueKey);
         if (destination != null) {
             // Remove the message references.
             // TODO this should probably be optimized.
             Iterator<Entry<Long, Long>> messages = destination.listTrackingNums(tx);
             while (messages.hasNext()) {
                 Long messageKey = messages.next().getKey();
-                removeMessageRef(tx, queue.getQueueName(), messageKey);
+                removeMessageRef(tx, messageKey);
             }
-            data.destinationIndex.remove(queue.getQueueName());
+            data.destinationIndex.remove(queueKey);
             destination.deallocate(tx);
         }
     }
 
-    public DestinationEntity getDestination(QueueDescriptor queue) {
-        return data.destinationIndex.get(queue.getQueueName());
+    public DestinationEntity getDestination(Long queue) {
+        return data.destinationIndex.get(queue);
     }
 
-    public Iterator<QueueQueryResult> queueList(Transaction tx, short type, QueueDescriptor firstQueue, int max) throws IOException {
-        LinkedList<QueueQueryResult> results = new LinkedList<QueueQueryResult>();
+    public Iterator<org.apache.activemq.apollo.store.QueueStatus> queueList(Transaction tx, AsciiBuffer type, QueueRecord firstQueue, int max) throws IOException {
+        LinkedList<org.apache.activemq.apollo.store.QueueStatus> results = new LinkedList<org.apache.activemq.apollo.store.QueueStatus>();
 
-        final Iterator<Entry<AsciiBuffer, DestinationEntity>> i;
-        i = data.destinationIndex.iterator(firstQueue==null? null : firstQueue.getQueueName());
+        final Iterator<Entry<Long, DestinationEntity>> i;
+        Long x = firstQueue==null? null : (Long)firstQueue.id;
+        i = data.destinationIndex.iterator(x);
         while (i.hasNext()) {
-            Entry<AsciiBuffer, DestinationEntity> entry = i.next();
+            Entry<Long, DestinationEntity> entry = i.next();
             DestinationEntity de = entry.getValue();
             if (results.size() >= max) {
                 break;
             }
 
-            if (type == -1 || de.getDescriptor().getApplicationType() == type) {
+            if (type == null || type.equals(de.getQueueRecord().queueType) ) {
                 results.add(queryQueue(tx, de));
             }
         }
         return results.iterator();
     }
 
-    private final QueueQueryResult queryQueue(Transaction tx, DestinationEntity de) throws IOException {
+    private final org.apache.activemq.apollo.store.QueueStatus queryQueue(Transaction tx, DestinationEntity de) throws IOException {
 
-        QueueQueryResultImpl result = new QueueQueryResultImpl();
+        QueueStatus result = new QueueStatus();
         result.count = de.getCount(tx);
         result.size = de.getSize(tx);
-        result.firstSequence = de.getFirstSequence(tx);
-        result.lastSequence = de.getLastSequence(tx);
-        result.desc = de.getDescriptor().copy();
-        Iterator<DestinationEntity> partitions = de.getPartitions();
-        if (partitions != null && partitions.hasNext()) {
-            result.partitions = new LinkedList<QueueQueryResult>();
-            while (partitions.hasNext()) {
-                result.partitions.add(queryQueue(tx, getDestination(partitions.next().getDescriptor()) ));
-            }
-        }
+        result.first = de.getFirstSequence(tx);
+        result.last = de.getLastSequence(tx);
+        result.record = de.getQueueRecord();
+        
+//        Iterator<DestinationEntity> partitions = de.getPartitions();
+//        if (partitions != null && partitions.hasNext()) {
+//            result.partitions = new LinkedList<org.apache.activemq.apollo.store.QueueStatus>();
+//            while (partitions.hasNext()) {
+//                result.partitions.add(queryQueue(tx, getDestination(partitions.next().getQueueRecord()) ));
+//            }
+//        }
 
         return result;
     }
@@ -576,40 +551,6 @@ public class RootEntity {
         this.data.lastUpdate = lastUpdate;
     }
 
-    private static class QueueQueryResultImpl implements QueueQueryResult {
-
-        QueueDescriptor desc;
-        Collection<QueueQueryResult> partitions;
-        long size;
-        int count;
-        long firstSequence;
-        long lastSequence;
-
-        public QueueDescriptor getDescriptor() {
-            return desc;
-        }
-
-        public Collection<QueueQueryResult> getPartitions() {
-            return partitions;
-        }
-
-        public long getSize() {
-            return size;
-        }
-
-        public int getCount() {
-            return count;
-        }
-
-        public long getFirstSequence() {
-            return firstSequence;
-        }
-
-        public long getLastSequence() {
-            return lastSequence;
-        }
-    }
-
     /**
      * @param lastAppendLocation
      * @param tx

Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/VoidCallback.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/VoidCallback.java?rev=961122&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/VoidCallback.java (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/VoidCallback.java Wed Jul  7 04:03:34 2010
@@ -0,0 +1,24 @@
+package org.apache.activemq.broker.store.hawtdb.store;
+
+/**
+ * Convenience class which allows you to implement {@link Callback} classes
+ * which do not return a value.
+ */
+public abstract class VoidCallback<T extends Exception> implements Callback<Object, T> {
+
+    /**
+     *
+     * @param session
+     *            provides you access to read and update the persistent
+     *            data.
+     * @throws T
+     *             if an error occurs and the transaction should get rolled
+     *             back
+     */
+    abstract public void run(HawtDBSession session) throws T;
+
+    final public Object execute(HawtDBSession session) throws T {
+        run(session);
+        return null;
+    }
+}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java (from r961121, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StorePerformanceBase.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StorePerformanceBase.java&r1=961121&r2=961122&rev=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StorePerformanceBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerBenchmark.java Wed Jul  7 04:03:34 2010
@@ -14,8 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.store;
+package org.apache.activemq.broker.store.hawtdb.store;
 
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.activemq.apollo.store.MessageRecord;
+import org.apache.activemq.apollo.store.QueueRecord;
+import org.apache.activemq.apollo.store.QueueStatus;
+import org.apache.activemq.apollo.store.QueueEntryRecord;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.metric.Period;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -24,26 +37,14 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import junit.framework.TestCase;
-
-import org.apache.activemq.broker.store.Store.MessageRecord;
-import org.apache.activemq.broker.store.Store.QueueRecord;
-import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.broker.store.Store.VoidCallback;
-import org.apache.activemq.metric.MetricAggregator;
-import org.apache.activemq.metric.MetricCounter;
-import org.apache.activemq.metric.Period;
-import org.fusesource.hawtbuf.AsciiBuffer;
-import org.fusesource.hawtbuf.Buffer;
+public class HawtDBManagerBenchmark extends TestCase {
 
-public abstract class StorePerformanceBase extends TestCase {
-    
     private static int PERFORMANCE_SAMPLES = 50;
     private static boolean SYNC_TO_DISK = true;
     private static final boolean USE_SHARED_WRITER = true;
 
-    private Store store;
-    private QueueDescriptor queueId;
+    private HawtDBManager store;
+    private QueueRecord queueId;
     private AtomicLong queueKey = new AtomicLong(0);
 
     protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
@@ -52,10 +53,15 @@ public abstract class StorePerformanceBa
     protected ArrayList<Consumer> consumers = new ArrayList<Consumer>();
     protected ArrayList<Producer> producers = new ArrayList<Producer>();
 
-    abstract protected Store createStore();
+    protected HawtDBManager createStore() {
+        HawtDBManager rc = new HawtDBManager();
+        rc.setStoreDirectory(new File("target/test-data/kahadb-store-performance"));
+        rc.setDeleteAllMessages(true);
+        return rc;
+    }
 
     private SharedWriter writer = null;
-    
+
     private Semaphore enqueuePermits;
     private Semaphore dequeuePermits;
 
@@ -69,30 +75,30 @@ public abstract class StorePerformanceBa
             writer = new SharedWriter();
             writer.start();
         }
-        
+
         enqueuePermits = new Semaphore(20000000);
         dequeuePermits = new Semaphore(0);
-        
-        queueId = new QueueDescriptor();
-        queueId.setQueueName(new AsciiBuffer("test"));
+
+        queueId = new QueueRecord();
+        queueId.name = new AsciiBuffer("test");
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
+            public void run(HawtDBSession session) throws Exception {
                 session.queueAdd(queueId);
             }
         }, null);
-        
+
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
-                Iterator<Store.QueueQueryResult> qqrs = session.queueList(queueId, 1);
-                assertTrue(qqrs.hasNext());
-                Store.QueueQueryResult qqr = qqrs.next();
-                if(qqr.getSize() > 0)
+            public void run(HawtDBSession session) throws Exception {
+                Iterator<QueueStatus> qqrs = session.queueList(queueId, 1);
+                Assert.assertTrue(qqrs.hasNext());
+                QueueStatus qqr = qqrs.next();
+                if(qqr.size > 0)
                 {
-                    queueKey.set(qqr.getLastSequence() + 1);
-                    System.out.println("Recovered queue: " + qqr.getDescriptor().getQueueName() + " with " + qqr.getCount() + " messages");
-                }                   
+                    queueKey.set(qqr.last + 1);
+                    System.out.println("Recovered queue: " + qqr.record.name + " with " + qqr.count + " messages");
+                }
             }
         }, null);
     }
@@ -135,10 +141,10 @@ public abstract class StorePerformanceBa
                 public void run() {
                 }
             };
-            op.op = new Store.VoidCallback<Exception>() {
+            op.op = new VoidCallback<Exception>() {
 
                 @Override
-                public void run(Session session) throws Exception {
+                public void run(HawtDBSession session) throws Exception {
                     // TODO Auto-generated method stub
                 }
             };
@@ -148,7 +154,7 @@ public abstract class StorePerformanceBa
         }
 
         public void run() {
-            Session session = store.getSession();
+            HawtDBSession session = store.getSession();
             try {
                 LinkedList<Runnable> processed = new LinkedList<Runnable>();
                 while (!stopped.get()) {
@@ -229,11 +235,11 @@ public abstract class StorePerformanceBa
                     enqueuePermits.acquire();
 
                     final MessageRecord messageRecord = new MessageRecord();
-                    messageRecord.setKey(store.allocateStoreTracking());
-                    messageRecord.setMessageId(new AsciiBuffer("" + i));
-                    messageRecord.setEncoding(new AsciiBuffer("encoding"));
-                    messageRecord.setBuffer(buffer);
-                    messageRecord.setSize(buffer.getLength());
+                    messageRecord.id = store.allocateStoreTracking();
+                    messageRecord.messageId = new AsciiBuffer("" + i);
+                    messageRecord.protocol = new AsciiBuffer("encoding");
+                    messageRecord.value = buffer;
+                    messageRecord.size = buffer.getLength();
 
                     SharedQueueOp op = new SharedQueueOp() {
                         public void run() {
@@ -243,13 +249,13 @@ public abstract class StorePerformanceBa
 
                     op.op = new VoidCallback<Exception>() {
                         @Override
-                        public void run(Session session) throws Exception {
+                        public void run(HawtDBSession session) throws Exception {
                             session.messageAdd(messageRecord);
-                            QueueRecord queueRecord = new Store.QueueRecord();
-                            queueRecord.setMessageKey(messageRecord.getKey());
-                            queueRecord.setQueueKey(queueKey.incrementAndGet());
-                            queueRecord.setSize(messageRecord.getSize());
-                            session.queueAddMessage(queueId, queueRecord);
+                            QueueEntryRecord queueEntryRecord = new QueueEntryRecord();
+                            queueEntryRecord.messageKey = messageRecord.id;
+                            queueEntryRecord.queueKey = queueKey.incrementAndGet();
+                            queueEntryRecord.size = messageRecord.size;
+                            session.queueAddMessage(queueId, queueEntryRecord);
                             dequeuePermits.release();
                         }
                     };
@@ -318,11 +324,11 @@ public abstract class StorePerformanceBa
 
                     op.op = new VoidCallback<Exception>() {
                         @Override
-                        public void run(Session session) throws Exception {
-                            Iterator<QueueRecord> queueRecords = session.queueListMessagesQueue(queueId, 0L, -1L, 1000);
-                            for (Iterator<QueueRecord> iterator = queueRecords; iterator.hasNext();) {
-                                QueueRecord r = iterator.next();
-                                records.add(session.messageGetRecord(r.getMessageKey()));
+                        public void run(HawtDBSession session) throws Exception {
+                            Iterator<QueueEntryRecord> queueRecords = session.queueListMessagesQueue(queueId, 0L, -1L, 1000);
+                            for (Iterator<QueueEntryRecord> iterator = queueRecords; iterator.hasNext();) {
+                                QueueEntryRecord r = iterator.next();
+                                records.add(session.messageGetRecord(r.messageKey));
                                 session.queueRemoveMessage(queueId, r.queueKey);
                             }
                         }
@@ -355,8 +361,8 @@ public abstract class StorePerformanceBa
         startProducers(1);
         reportRates();
     }
-    
-    
+
+
     public void test1_1_1() throws Exception {
         startProducers(1);
         startConsumers(1);
@@ -397,4 +403,4 @@ public abstract class StorePerformanceBa
         }
     }
 
-}
+}
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java (from r961121, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StoreTestBase.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StoreTestBase.java&r1=961121&r2=961122&rev=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/StoreTestBase.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManagerTest.java Wed Jul  7 04:03:34 2010
@@ -14,8 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.store;
+package org.apache.activemq.broker.store.hawtdb.store;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Date;
 import java.util.HashMap;
@@ -24,25 +25,29 @@ import java.util.TreeMap;
 
 import junit.framework.TestCase;
 
-import org.apache.activemq.broker.store.Store.FatalStoreException;
-import org.apache.activemq.broker.store.Store.MessageRecord;
-import org.apache.activemq.broker.store.Store.QueueQueryResult;
-import org.apache.activemq.broker.store.Store.QueueRecord;
-import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.broker.store.Store.SubscriptionRecord;
-import org.apache.activemq.broker.store.Store.VoidCallback;
+import org.apache.activemq.apollo.store.*;
 import org.fusesource.hawtbuf.AsciiBuffer;
 import org.fusesource.hawtbuf.Buffer;
+import org.junit.Assert;
 
-public abstract class StoreTestBase extends TestCase {
+public class HawtDBManagerTest extends TestCase {
 
-    private Store store;
+    private HawtDBManager store;
 
-    abstract protected Store createStore(boolean delete);
+    protected HawtDBManager createStore(boolean delete) {
+        HawtDBManager rc = new HawtDBManager();
+        rc.setStoreDirectory(new File("target/test-data/kahadb-store-test"));
+        rc.setDeleteAllMessages(delete);
+        return rc;
+    }
 
-    abstract protected boolean isStoreTransactional();
+    protected boolean isStoreTransactional() {
+        return true;
+    }
 
-    abstract protected boolean isStorePersistent();
+    protected boolean isStorePersistent() {
+        return true;
+    }
 
     @Override
     protected void setUp() throws Exception {
@@ -59,35 +64,35 @@ public abstract class StoreTestBase exte
 
     public void testMessageAdd() throws Exception {
         final MessageRecord expected = new MessageRecord();
-        expected.setBuffer(new AsciiBuffer("buffer").buffer());
-        expected.setEncoding(new AsciiBuffer("encoding"));
-        expected.setMessageId(new AsciiBuffer("1000"));
-        expected.setKey(store.allocateStoreTracking());
-        expected.setSize(expected.getBuffer().getLength());
+        expected.value = new AsciiBuffer("buffer").buffer();
+        expected.protocol = new AsciiBuffer("encoding");
+        expected.messageId = new AsciiBuffer("1000");
+        expected.id = store.allocateStoreTracking();
+        expected.size = expected.value.getLength();
 
         store.execute(new VoidCallback<Exception>() {
-            public void run(Session session) throws Exception {
+            public void run(HawtDBSession session) throws Exception {
                 session.messageAdd(expected);
             }
         }, null);
 
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
-                MessageRecord actual = session.messageGetRecord(expected.getKey());
+            public void run(HawtDBSession session) throws Exception {
+                MessageRecord actual = session.messageGetRecord(expected.id);
                 assertEquals(expected, actual);
             }
         }, null);
     }
 
     public void testQueueAdd() throws Exception {
-        final QueueDescriptor expected = new QueueDescriptor();
-        expected.setQueueName(new AsciiBuffer("testQueue"));
-        expected.setApplicationType((short) 1);
+        final QueueRecord expected = new QueueRecord();
+        expected.name = new AsciiBuffer("testQueue");
+        expected.queueType = new AsciiBuffer("testType");
 
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
+            public void run(HawtDBSession session) throws Exception {
                 session.queueAdd(expected);
             }
         }, null);
@@ -107,33 +112,33 @@ public abstract class StoreTestBase exte
     }
 
     public void testQueueMessageAdd() throws Exception {
-        final QueueDescriptor queue = new QueueDescriptor();
-        queue.setQueueName(new AsciiBuffer("testQueue"));
-        queue.setApplicationType((short) 1);
+        final QueueRecord queue = new QueueRecord();
+        queue.name = new AsciiBuffer("testQueue");
+        queue.queueType = new AsciiBuffer("testType");
 
         final MessageRecord message = new MessageRecord();
-        message.setBuffer(new AsciiBuffer("buffer").buffer());
-        message.setEncoding(new AsciiBuffer("encoding"));
-        message.setMessageId(new AsciiBuffer("1000"));
-        message.setKey(store.allocateStoreTracking());
-        message.setSize(message.getBuffer().getLength());
+        message.value = new AsciiBuffer("buffer").buffer();
+        message.protocol = new AsciiBuffer("encoding");
+        message.messageId = new AsciiBuffer("1000");
+        message.id = store.allocateStoreTracking();
+        message.size = message.value.getLength();
 
-        final QueueRecord qRecord = new QueueRecord();
-        qRecord.setMessageKey(message.getKey());
-        qRecord.setQueueKey(1L);
-        qRecord.setSize(message.getSize());
+        final QueueEntryRecord qEntryRecord = new QueueEntryRecord();
+        qEntryRecord.messageKey = message.id;
+        qEntryRecord.queueKey = 1L;
+        qEntryRecord.size = message.size;
 
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
+            public void run(HawtDBSession session) throws Exception {
                 session.queueAdd(queue);
                 session.messageAdd(message);
-                session.queueAddMessage(queue, qRecord);
+                session.queueAddMessage(queue, qEntryRecord);
             }
         }, null);
 
-        checkQueue(queue, message.getSize(), 1);
-        checkMessageRestore(queue, qRecord, message);
+        checkQueue(queue, message.size, 1);
+        checkMessageRestore(queue, qEntryRecord, message);
 
         //Restart the store and make sure the queue is still there
         if (isStorePersistent()) {
@@ -142,55 +147,55 @@ public abstract class StoreTestBase exte
             store.start();
 
             //Test that the queue was persisted
-            checkQueue(queue, message.getSize(), 1);
-            checkMessageRestore(queue, qRecord, message);
+            checkQueue(queue, message.size, 1);
+            checkMessageRestore(queue, qEntryRecord, message);
         }
     }
 
     public void testSubscriptions() throws Exception {
         HashMap<AsciiBuffer, SubscriptionRecord> expected = new HashMap<AsciiBuffer, SubscriptionRecord>();
-        
+
         final SubscriptionRecord record1 = new SubscriptionRecord();
-        record1.setName(new AsciiBuffer("sub1"));
-        record1.setIsDurable(true);
-        record1.setDestination(new AsciiBuffer("topic1"));
-        expected.put(record1.getName(), record1);
-        
+        record1.name = new AsciiBuffer("sub1");
+        record1.isDurable = true;
+        record1.destination = new AsciiBuffer("topic1");
+        expected.put(record1.name, record1);
+
         final SubscriptionRecord record2 = new SubscriptionRecord();
-        record2.setName(new AsciiBuffer("sub2"));
-        record2.setIsDurable(false);
-        record2.setDestination(new AsciiBuffer("topic2"));
-        record2.setTte(System.currentTimeMillis() + 40000);
-        record2.setSelector(new AsciiBuffer("foo"));
+        record2.name = new AsciiBuffer("sub2");
+        record2.isDurable = false;
+        record2.destination = new AsciiBuffer("topic2");
+        record2.expiration = System.currentTimeMillis() + 40000;
+        record2.selector = new AsciiBuffer("foo");
         byte[] attachment2 = new byte[1024];
         for (int i = 0; i < attachment2.length; i++) {
             attachment2[i] = (byte) i;
         }
-        record2.setAttachment(new Buffer(attachment2));
-        expected.put(record2.getName(), record2);
+        record2.attachment = new Buffer(attachment2);
+        expected.put(record2.name, record2);
 
         //They make it?
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
+            public void run(HawtDBSession session) throws Exception {
                 session.addSubscription(record1);
                 session.addSubscription(record2);
             }
         }, null);
-        
+
         checkSubscriptions(expected);
-        
+
         //Let's remove one:
-        expected.remove(record1.getName());
+        expected.remove(record1.name);
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
-                session.removeSubscription(record1.getName());
+            public void run(HawtDBSession session) throws Exception {
+                session.removeSubscription(record1.name);
             }
         }, null);
-        
+
         checkSubscriptions(expected);
-        
+
         //Restart the store and make sure the queue is still there
         if (isStorePersistent()) {
             store.stop();
@@ -201,48 +206,48 @@ public abstract class StoreTestBase exte
             checkSubscriptions(expected);
         }
     }
-    
-    
+
+
     public void testMap() throws Exception {
         final TreeMap<AsciiBuffer, Buffer> expected = new TreeMap<AsciiBuffer, Buffer>();
         final AsciiBuffer map = new AsciiBuffer("testMap");
-        
+
         for(int i=0; i < 100000; i++)
         {
             expected.put(new AsciiBuffer("Key" + i), new AsciiBuffer("Value" + i));
         }
-        
+
         //Test no values present:
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
+            public void run(HawtDBSession session) throws Exception {
                 Iterator<AsciiBuffer> r = session.mapList(null, 10);
-                assertEquals("Not expecting any maps", false, r.hasNext());
+                Assert.assertEquals("Not expecting any maps", false, r.hasNext());
             }
         }, null);
-        
+
         //Test auto add:
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
+            public void run(HawtDBSession session) throws Exception {
                 session.mapEntryPut(new AsciiBuffer("testMap"), expected.firstKey(), expected.get(expected.firstKey()));
                 assertEquals("Value should be in map", session.mapEntryGet(map, expected.firstKey()), expected.get(expected.firstKey()));
             }
         }, null);
-        
+
         //Test re-add non empty map
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
+            public void run(HawtDBSession session) throws Exception {
                 session.mapAdd(map);
                 assertEquals("Value should be in map", session.mapEntryGet(map, expected.firstKey()), expected.get(expected.firstKey()));
             }
         }, null);
-        
+
         //Test overwrite
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
+            public void run(HawtDBSession session) throws Exception {
                 AsciiBuffer overwrite = new AsciiBuffer("overwrite");
                 session.mapEntryPut(map, expected.firstKey(), overwrite);
                 assertEquals("Value should be in map", session.mapEntryGet(map, expected.firstKey()), overwrite);
@@ -251,48 +256,48 @@ public abstract class StoreTestBase exte
 
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
+            public void run(HawtDBSession session) throws Exception {
                 assertEquals("Value should be in map", session.mapEntryGet(map, expected.firstKey()), new AsciiBuffer("overwrite"));
             }
         }, null);
-        
+
         //Test map remove:
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
+            public void run(HawtDBSession session) throws Exception {
 //                AsciiBuffer overwrite = new AsciiBuffer("overwrite");
                 session.mapRemove(map);
                 Iterator<AsciiBuffer> r = session.mapList(null, 10);
-                assertEquals("Not expecting any maps", false, r.hasNext());
+                Assert.assertEquals("Not expecting any maps", false, r.hasNext());
             }
         }, null);
-        
+
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
+            public void run(HawtDBSession session) throws Exception {
                 Iterator<AsciiBuffer> r = session.mapList(null, 10);
-                assertEquals("Not expecting any maps", false, r.hasNext());
+                Assert.assertEquals("Not expecting any maps", false, r.hasNext());
             }
         }, null);
-        
-        
+
+
         //Test multiple adds:
         System.out.println(new Date() + " Adding entries");
-        
+
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
+            public void run(HawtDBSession session) throws Exception {
                 for(AsciiBuffer k : expected.keySet())
                 {
                     session.mapEntryPut(map, k, expected.get(k));
                 }
             }
         }, null);
-        
+
         System.out.println(new Date() + " Checking entries");
-        
+
         checkMap(map, expected);
-        
+
         //Restart the store and make sure the entries are still there
         if (isStorePersistent()) {
             store.stop();
@@ -301,18 +306,18 @@ public abstract class StoreTestBase exte
             store = createStore(false);
             store.start();
             System.out.println(new Date() + " Started store");
-            
+
 
             //Test that the queue was persisted
             checkMap(map, expected);
         }
     }
-    
+
     private void checkMap(final AsciiBuffer mapName, final TreeMap<AsciiBuffer, Buffer> expected) throws Exception
     {
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
+            public void run(HawtDBSession session) throws Exception {
                 Iterator<AsciiBuffer> r = session.mapEntryListKeys(mapName, expected.firstKey(), expected.size());
                 TreeMap<AsciiBuffer, Buffer> comp = new TreeMap<AsciiBuffer, Buffer>();
                 while(r.hasNext())
@@ -321,59 +326,59 @@ public abstract class StoreTestBase exte
                     Buffer value = session.mapEntryGet(mapName, key);
                     comp.put(key, new AsciiBuffer(value.data));
                 }
-                
-                assertEquals("Map in store doesn't match", expected, comp);
-                
+
+                Assert.assertEquals("Map in store doesn't match", expected, comp);
+
             }
         }, null);
     }
-    
+
     @SuppressWarnings("unchecked")
     private void checkSubscriptions(HashMap<AsciiBuffer, SubscriptionRecord> expected) throws Exception
     {
         final HashMap<AsciiBuffer, SubscriptionRecord> checkMap = (HashMap<AsciiBuffer, SubscriptionRecord>) expected.clone();
-        
+
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
+            public void run(HawtDBSession session) throws Exception {
                 Iterator<SubscriptionRecord> results = session.listSubscriptions();
                 while(results.hasNext())
                 {
                     SubscriptionRecord r = results.next();
-                    SubscriptionRecord e = checkMap.remove(r.getName());
-                    assertEquals(r, e);
+                    SubscriptionRecord e = checkMap.remove(r.name);
+                    Assert.assertEquals(r, e);
                 }
-                
+
                 //Shouldn't be any expected results left:
-                assertEquals(0, checkMap.size());
+                Assert.assertEquals(0, checkMap.size());
             }
         }, null);
     }
 
-    private void checkQueue(final QueueDescriptor queue, final long expectedSize, final long expectedCount) throws FatalStoreException, Exception {
+    private void checkQueue(final QueueRecord queue, final long expectedSize, final long expectedCount) throws FatalStoreException, Exception {
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
-                Iterator<QueueQueryResult> list = session.queueList(null, 100);
-                assertTrue(list.hasNext());
-                QueueQueryResult actual = list.next();
-                assertEquals(queue, actual.getDescriptor());
-                assertEquals(expectedSize, actual.getSize());
-                assertEquals(expectedCount, actual.getCount());
+            public void run(HawtDBSession session) throws Exception {
+                Iterator<QueueStatus> list = session.queueList(null, 100);
+                Assert.assertTrue(list.hasNext());
+                QueueStatus actual = list.next();
+                assertEquals(queue, actual.record);
+                assertEquals(expectedSize, actual.size);
+                assertEquals(expectedCount, actual.count);
             }
         }, null);
     }
 
-    private void checkMessageRestore(final QueueDescriptor queue, final QueueRecord qRecord, final MessageRecord message) throws FatalStoreException, Exception {
+    private void checkMessageRestore(final QueueRecord queue, final QueueEntryRecord qEntryRecord, final MessageRecord message) throws FatalStoreException, Exception {
         store.execute(new VoidCallback<Exception>() {
             @Override
-            public void run(Session session) throws Exception {
-                Iterator<QueueRecord> qRecords = session.queueListMessagesQueue(queue, 0L, -1L, -1);
-                assertTrue(qRecords.hasNext());
-                QueueRecord qr = qRecords.next();
-                assertEquals(qRecord.getQueueKey(), qr.getQueueKey());
-                assertEquals(qRecord.getMessageKey(), message.getKey());
-                MessageRecord record = session.messageGetRecord(qr.getMessageKey());
+            public void run(HawtDBSession session) throws Exception {
+                Iterator<QueueEntryRecord> qRecords = session.queueListMessagesQueue(queue, 0L, -1L, -1);
+                Assert.assertTrue(qRecords.hasNext());
+                QueueEntryRecord qr = qRecords.next();
+                Assert.assertEquals(qEntryRecord.queueKey, qr.queueKey);
+                Assert.assertEquals(qEntryRecord.messageKey, message.id);
+                MessageRecord record = session.messageGetRecord(qr.messageKey);
                 assertEquals(record, message);
             }
         }, null);
@@ -383,14 +388,14 @@ public abstract class StoreTestBase exte
         try {
             store.execute(new VoidCallback<Exception>() {
                 @Override
-                public void run(Session session) throws Exception {
-                    QueueDescriptor qd = new QueueDescriptor();
-                    qd.setQueueName(new AsciiBuffer("test"));
+                public void run(HawtDBSession session) throws Exception {
+                    QueueRecord qd = new QueueRecord();
+                    qd.name = new AsciiBuffer("test");
                     session.queueAdd(qd);
                     throw new IOException("Expected");
                 }
             }, null);
-            fail("Expected IOException");
+            Assert.fail("Expected IOException");
         } catch (IOException e) {
         }
 
@@ -400,28 +405,25 @@ public abstract class StoreTestBase exte
         if (isStoreTransactional()) {
             store.execute(new VoidCallback<Exception>() {
                 @Override
-                public void run(Session session) throws Exception {
-                    Iterator<QueueQueryResult> list = session.queueList(null, 100);
-                    assertFalse(list.hasNext());
+                public void run(HawtDBSession session) throws Exception {
+                    Iterator<QueueStatus> list = session.queueList(null, 100);
+                    Assert.assertFalse(list.hasNext());
                 }
             }, null);
         }
     }
 
     static void assertEquals(MessageRecord expected, MessageRecord actual) {
-        assertEquals(expected.getBuffer(), actual.getBuffer());
-        assertEquals(expected.getEncoding(), actual.getEncoding());
-        assertEquals(expected.getMessageId(), actual.getMessageId());
-        assertEquals(expected.getStreamKey(), actual.getStreamKey());
-        assertEquals(expected.getSize(), actual.getSize());
+        Assert.assertEquals(expected.value, actual.value);
+        Assert.assertEquals(expected.protocol, actual.protocol);
+        Assert.assertEquals(expected.messageId, actual.messageId);
+        Assert.assertEquals(expected.stream, actual.stream);
+        Assert.assertEquals(expected.size, actual.size);
     }
 
-    static void assertEquals(QueueDescriptor expected, QueueDescriptor actual) {
-        assertEquals(expected.getParent(), actual.getParent());
-        assertEquals(expected.getQueueType(), actual.getQueueType());
-        assertEquals(expected.getApplicationType(), actual.getApplicationType());
-        assertEquals(expected.getPartitionKey(), actual.getPartitionKey());
-        assertEquals(expected.getQueueName(), actual.getQueueName());
+    static void assertEquals(QueueRecord expected, QueueRecord actual) {
+        assertEquals(expected.queueType, actual.queueType);
+        assertEquals(expected.name, actual.name);
         //TODO test partitions?
 
     }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java (from r961121, activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/memory/MemoryStoreTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/memory/MemoryStoreTest.java&r1=961121&r2=961122&rev=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/test/scala/org/apache/activemq/broker/store/memory/MemoryStoreTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/MessageRecord.java Wed Jul  7 04:03:34 2010
@@ -14,26 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.store.memory;
+package org.apache.activemq.apollo.store;
 
-import org.apache.activemq.broker.store.Store;
-import org.apache.activemq.broker.store.StoreTestBase;
+import org.fusesource.hawtbuf.AsciiBuffer;
+import org.fusesource.hawtbuf.Buffer;
 
-public class MemoryStoreTest extends StoreTestBase {
-
-    @Override
-    protected Store createStore(boolean delete) {
-        return new MemoryStore();
-    }
-
-    @Override
-    protected boolean isStorePersistent() {
-        return false;
-    }
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MessageRecord {
 
-    @Override
-    protected boolean isStoreTransactional() {
-        return false;
-    }
+    public long id = -1;
+    public AsciiBuffer messageId;
+    public AsciiBuffer protocol;
+    public int size;
+    public Buffer value;
+    public long stream = -1;
+    public long expiration = 0;
 
 }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java (from r961121, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/StoreFactory.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/StoreFactory.java&r1=961121&r2=961122&rev=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/broker/store/StoreFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueEntryRecord.java Wed Jul  7 04:03:34 2010
@@ -14,17 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.store;
+package org.apache.activemq.apollo.store;
 
-import java.io.IOException;
+import org.fusesource.hawtbuf.Buffer;
 
-import org.apache.activemq.util.FactoryFinder;
-
-public class StoreFactory {
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class QueueEntryRecord {
 
-    static private final FactoryFinder STORE_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/broker/store/");
+    public long queueKey;
+    public long messageKey;
+    public Buffer attachment;
+    public int size;
+    public short redeliveries;
 
-    public static Store createStore(String type) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException {
-        return (Store) STORE_FINDER.newInstance(type);
-    }
 }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java (from r961121, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.java&r1=961121&r2=961122&rev=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueRecord.java Wed Jul  7 04:03:34 2010
@@ -14,32 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.store.hawtdb;
+package org.apache.activemq.apollo.store;
 
-import java.io.File;
+import org.fusesource.hawtbuf.AsciiBuffer;
 
-import org.apache.activemq.broker.store.Store;
-import org.apache.activemq.broker.store.StoreTestBase;
-import org.apache.activemq.broker.store.hawtdb.HawtDBStore;
-
-public class HawtDBStoreTest extends StoreTestBase {
-
-    @Override
-    protected Store createStore(boolean delete) {
-        HawtDBStore rc = new HawtDBStore();
-        rc.setStoreDirectory(new File("target/test-data/kahadb-store-test"));
-        rc.setDeleteAllMessages(delete);
-        return rc;
-    }
-
-    @Override
-    protected boolean isStorePersistent() {
-        return true;
-    }
-
-    @Override
-    protected boolean isStoreTransactional() {
-        return true;
-    }
 
-}
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class QueueRecord {
+
+    public long id = -1;
+    public AsciiBuffer name;
+    public AsciiBuffer queueType;
+
+//    public AsciiBuffer parent;
+
+}
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueStatus.java (from r961121, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueStatus.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueStatus.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.java&r1=961121&r2=961122&rev=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/test/scala/org/apache/activemq/broker/store/hawtdb/HawtDBStoreTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/QueueStatus.java Wed Jul  7 04:03:34 2010
@@ -14,32 +14,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.store.hawtdb;
+package org.apache.activemq.apollo.store;
 
-import java.io.File;
+/**
+ * Result Holder for queue related queries.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class QueueStatus {
 
-import org.apache.activemq.broker.store.Store;
-import org.apache.activemq.broker.store.StoreTestBase;
-import org.apache.activemq.broker.store.hawtdb.HawtDBStore;
-
-public class HawtDBStoreTest extends StoreTestBase {
-
-    @Override
-    protected Store createStore(boolean delete) {
-        HawtDBStore rc = new HawtDBStore();
-        rc.setStoreDirectory(new File("target/test-data/kahadb-store-test"));
-        rc.setDeleteAllMessages(delete);
-        return rc;
-    }
-
-    @Override
-    protected boolean isStorePersistent() {
-        return true;
-    }
-
-    @Override
-    protected boolean isStoreTransactional() {
-        return true;
-    }
+    /**
+     * The descriptor for the queue.
+     */
+    public QueueRecord record;
+
+    /**
+     * Gets the count of elements in this queue. Note that this does not
+     * include counts for elements held in child partitions.
+     */
+    public int count;
+
+    /**
+     * The size of elements in this queue. Note that this does not
+     * include size of elements held in child partitions.
+     */
+    public long size;
+
+    /**
+     * The first sequence number in the queue.
+     */
+    public long first;
+
+    /**
+     * The last sequence number in the queue.
+     */
+    public long last;
 
 }

Copied: activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala (from r961121, activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala&r1=961121&r2=961122&rev=961122&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/BrokerDatabase.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-store/src/main/scala/org/apache/activemq/apollo/store/Store.scala Wed Jul  7 04:03:34 2010
@@ -20,23 +20,7 @@ import _root_.java.lang.{String}
 import org.fusesource.hawtbuf._
 import org.apache.activemq.Service
 import org.fusesource.hawtdispatch.{Retained}
-
-class StoredQueue {
-  var id:Long = -1
-  var name:AsciiBuffer = null
-  var parent:AsciiBuffer = null
-  var config:String = null
-  var first:Long = -1
-  var last:Long = -1
-  var count:Int = 0
-}
-
-class StoredMessage {
-  var id:Long = -1
-  var protocol:AsciiBuffer = null
-  var value:Buffer = null
-  var size:Int = 0
-}
+import org.apache.activemq.apollo.store._
 
 /**
  * A StoreTransaction is used to perform persistent
@@ -54,7 +38,7 @@ trait StoreTransaction extends Retained 
    * Assigns the delivery a store id if it did not already
    * have one assigned.
    */
-  def store(delivery:StoredMessage)
+  def store(delivery:MessageRecord)
 
   /**
    * Adds a delivery to a specified queue at a the specified position in the queue.
@@ -71,18 +55,18 @@ trait StoreTransaction extends Retained 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait BrokerDatabase extends Service {
+trait Store extends Service {
 
 
   /**
    * Stores a queue, calls back with a unquie id for the stored queue.
    */
-  def addQueue(record:StoredQueue)(cb:(Option[Long])=>Unit):Unit
+  def addQueue(record:QueueRecord)(cb:(Option[Long])=>Unit):Unit
 
   /**
    * Loads the queue information for a given queue id.
    */
-  def getQueueInfo(id:Long)(cb:(Option[StoredQueue])=>Unit )
+  def getQueueStatus(id:Long)(cb:(Option[QueueStatus])=>Unit )
 
   /**
    * gets a listing of all queues previously added.
@@ -99,7 +83,7 @@ trait BrokerDatabase extends Service {
   /**
    * Loads a delivery with the associated id from persistent storage.
    */
-  def loadMessage(id:Long)(cb:(Option[StoredMessage])=>Unit )
+  def loadMessage(id:Long)(cb:(Option[MessageRecord])=>Unit )
 
   /**
    * Creates a StoreTransaction which is used to perform persistent



Mime
View raw message