activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r758785 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/store/kahadb/ test/java/org/apache/activemq/broker/store/ test/java/org/apache/activemq/broker/store/kahadb/
Date Thu, 26 Mar 2009 18:17:15 GMT
Author: chirino
Date: Thu Mar 26 18:17:01 2009
New Revision: 758785

URL: http://svn.apache.org/viewvc?rev=758785&view=rev
Log:
Yay some initial test are now working.


Added:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=758785&r1=758784&r2=758785&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
Thu Mar 26 18:17:01 2009
@@ -22,6 +22,7 @@
 import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -134,6 +135,10 @@
 	 */
 	public void open() throws IOException {
 		if( opened.compareAndSet(false, true) ) {
+		    if( directory ==null ) {
+		        throw new IllegalArgumentException("The directory property must be set.");
+		    }
+		    
             File lockFileName = new File(directory, "lock");
             lockFile = new LockFile(lockFileName, true);
 	        if (failIfDatabaseIsLocked) {
@@ -230,19 +235,19 @@
 	}
 	
     public void unload() throws IOException, InterruptedException {
-        indexLock.writeLock().lock();
-        try {
-            if( pageFile.isLoaded() ) {
+        if (pageFile.isLoaded()) {
+            indexLock.writeLock().lock();
+            try {
                 rootEntity.setState(CLOSED_STATE);
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         rootEntity.store(tx);
                     }
                 });
-                close();
+            } finally {
+                indexLock.writeLock().unlock();
             }
-        } finally {
-            indexLock.writeLock().unlock();
+            close();
         }
     }
 
@@ -269,7 +274,7 @@
 		        int redoCounter = 0;
 		        while (recoveryPosition != null) {
 		            final TypeCreatable message = load(recoveryPosition);
-		            final Location location = lastRecoveryPosition;
+		            final Location location = recoveryPosition;
 		            rootEntity.setLastUpdate(recoveryPosition);
 		            
 	                pageFile.tx().execute(new Transaction.Closure<IOException>() {
@@ -520,7 +525,7 @@
     public Location store(TypeCreatable data) throws IOException {
         return store(data, false);
     }
-
+    
     /**
      * All updated are are funneled through this method. The updates a converted
      * to a PBMessage which is logged to the journal and then the data from
@@ -537,10 +542,12 @@
         message.writeUnframed(os);
 
         long start = System.currentTimeMillis();
-        final Location location = journal.write(os.toByteSequence(), sync);
+        final Location location;
+        synchronized(journal) {
+            location = journal.write(os.toByteSequence(), sync);
+        }
         long start2 = System.currentTimeMillis();
         
-        
         try {
             indexLock.writeLock().lock();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
@@ -560,6 +567,74 @@
         return location;
     }
     
+    
+    public void store(List<TypeCreatable> batch) throws IOException {
+        store(batch, false);
+    }
+    
+    // ArrayList<TypeCreatable>
+    /**
+     * All updated are are funneled through this method. The updates a converted
+     * to a PBMessage which is logged to the journal and then the data from
+     * the PBMessage is used to update the index just like it would be done
+     * during a recovery process.
+     * @throws IOException 
+     */
+    @SuppressWarnings("unchecked")
+    public void store(final List<TypeCreatable> batch, boolean sync) throws IOException
{
+        if( batch.isEmpty() ) {
+            return;
+        }
+        if( batch.size()==1 ) {
+            store(batch.get(0), sync);
+            return;
+        }
+        
+        
+        ArrayList<ByteSequence> encodedData = new ArrayList<ByteSequence>(batch.size());
+        for (TypeCreatable data : batch) {
+            final MessageBuffer message = ((PBMessage) data).freeze();
+            int size = message.serializedSizeUnframed();
+            DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
+            os.writeByte(data.toType().getNumber());
+            message.writeUnframed(os);
+            encodedData.add(os.toByteSequence());
+        }
+        
+
+        final ArrayList<Location> locations = new ArrayList<Location>(batch.size());
+        long start = System.currentTimeMillis();
+        synchronized(journal) {
+            for (ByteSequence bs : encodedData) {
+                Location location = journal.write(bs, sync);
+                locations.add(location);
+            }
+        }
+        long start2 = System.currentTimeMillis();
+        
+        try {
+            indexLock.writeLock().lock();
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    for( int i=0; i < batch.size(); i++) {
+                        TypeCreatable data = batch.get(i);
+                        MessageBuffer message = ((PBMessage) data).freeze();
+                        Location location = locations.get(i);
+                        updateIndex(tx, data.toType(), message, location);
+                        rootEntity.setLastUpdate(location);
+                    }
+                }
+            });
+        } finally {
+            indexLock.writeLock().unlock();
+        }
+
+        long end = System.currentTimeMillis();
+        if( end-start > 100 ) { 
+            LOG.warn("KahaDB long enqueue time: Journal Add Took: "+(start2-start)+" ms,
Index Update took "+(end-start2)+" ms");
+        }
+    }
+
     /**
      * Loads a previously stored PBMessage
      * 
@@ -653,17 +728,54 @@
             return tx;
         }
         
+        public void close() {
+            try {
+                if( tx!=null ) {
+                    tx.rollback();
+                }
+            } catch (IOException e) {
+                throw new FatalStoreException(e);
+            } finally {
+                if( tx!=null ) {
+                    indexLock.readLock().unlock();
+                    tx=null;
+                }
+            }
+        }
+
+        public void commit() {
+            try {
+                if( tx!=null ) {
+                    tx.commit();
+                }
+                store(updates);
+            } catch (IOException e) {
+                throw new FatalStoreException(e);
+            } finally {
+                if( tx!=null ) {
+                    indexLock.readLock().unlock();
+                    tx=null;
+                }
+            }
+        }
+        
         ///////////////////////////////////////////////////////////////
         // Message related methods.
         ///////////////////////////////////////////////////////////////
         public Long messageAdd(MessageRecord message) {
             Long id = rootEntity.nextMessageKey();
             MessageAddBean bean = new MessageAddBean();
-            bean.setBuffer(message.getBuffer());
-            bean.setEncoding(message.getEncoding());
+            bean.setMessageKey(id);
             bean.setMessageId(message.getMessageId());
-            bean.setMessageKey(id); 
-            bean.setStreamKey(message.getStreamKey());
+            bean.setEncoding(message.getEncoding());
+            Buffer buffer = message.getBuffer();
+            if( buffer!=null ) {
+                bean.setBuffer(buffer);
+            }            
+            Long streamKey = message.getStreamKey();
+            if( streamKey !=null ) {
+                bean.setStreamKey(streamKey);
+            }
             updates.add(bean);
             return id;
         }
@@ -678,7 +790,18 @@
                 throw new KeyNotFoundException("message key: "+key);
             }
             try {
-                return (MessageRecord) load(location);
+                MessageAdd bean = (MessageAdd) load(location);
+                MessageRecord rc = new MessageRecord();
+                rc.setKey(bean.getMessageKey());
+                rc.setMessageId(bean.getMessageId());
+                rc.setEncoding(bean.getEncoding());
+                if( bean.hasBuffer() ) {
+                    rc.setBuffer(bean.getBuffer());
+                }
+                if( bean.hasStreamKey() ) {
+                    rc.setStreamKey(bean.getStreamKey());
+                }
+                return rc;
             } catch (IOException e) {
                 throw new FatalStoreException(e);
             }
@@ -787,13 +910,17 @@
         }
         public void transactionRollback(Buffer txid) throws KeyNotFoundException {
         }
-        
     }
     
     public <R, T extends Exception> R execute(final Callback<R, T> callback,
final Runnable onFlush) throws T {
         KahaDBSession session = new KahaDBSession();
-        R rc = callback.execute(session);
-        return rc;
+        try {
+            R rc = callback.execute(session);
+            session.commit();
+            return rc;
+        } finally {
+            session.close();
+        }
     }
 
     public void flush() {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=758785&r1=758784&r2=758785&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
Thu Mar 26 18:17:01 2009
@@ -32,6 +32,7 @@
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.util.LongMarshaller;
 import org.apache.kahadb.util.Marshaller;
 
 public class RootEntity {
@@ -44,6 +45,9 @@
         public RootEntity readPayload(DataInput is) throws IOException {
             RootEntity rc = new RootEntity();
             rc.state = is.readInt();
+            rc.messageKeyIndex = new BTreeIndex<Long, MessageKeys>(is.readLong());
+            rc.locationIndex = new BTreeIndex<Location, Long>(is.readLong());
+            rc.messageIdIndex = new BTreeIndex<AsciiBuffer, Long>(is.readLong());
             rc.destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(is.readLong());
             if (is.readBoolean()) {
                 rc.lastUpdate = Marshallers.LOCATION_MARSHALLER.readPayload(is);
@@ -55,6 +59,9 @@
 
         public void writePayload(RootEntity object, DataOutput os) throws IOException {
             os.writeInt(object.state);
+            os.writeLong(object.messageKeyIndex.getPageId());
+            os.writeLong(object.locationIndex.getPageId());
+            os.writeLong(object.messageIdIndex.getPageId());
             os.writeLong(object.destinationIndex.getPageId());
             if (object.lastUpdate != null) {
                 os.writeBoolean(true);
@@ -94,6 +101,10 @@
         assert pageId == 0;
         
         state = KahaDBStore.CLOSED_STATE;
+        
+        messageKeyIndex = new BTreeIndex<Long, MessageKeys>(tx.getPageFile(), tx.allocate().getPageId());
+        locationIndex = new BTreeIndex<Location, Long>(tx.getPageFile(), tx.allocate().getPageId());
+        messageIdIndex = new BTreeIndex<AsciiBuffer, Long>(tx.getPageFile(), tx.allocate().getPageId());
         destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(tx.getPageFile(),
tx.allocate().getPageId());
 
         page.set(this);
@@ -101,6 +112,21 @@
     }
     
     public void load(Transaction tx) throws IOException {
+        messageKeyIndex.setPageFile(tx.getPageFile());
+        messageKeyIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+        messageKeyIndex.setValueMarshaller(MessageKeys.MARSHALLER);
+        messageKeyIndex.load(tx);
+
+        locationIndex.setPageFile(tx.getPageFile());
+        locationIndex.setKeyMarshaller(Marshallers.LOCATION_MARSHALLER);
+        locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
+        locationIndex.load(tx);
+
+        messageIdIndex.setPageFile(tx.getPageFile());
+        messageIdIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+        messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
+        messageIdIndex.load(tx);
+        
         destinationIndex.setPageFile(tx.getPageFile());
         destinationIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
         destinationIndex.setValueMarshaller(DestinationEntity.MARSHALLER);
@@ -129,7 +155,7 @@
     }
 
     public void messageAdd(Transaction tx, MessageAdd command, Location location) throws
IOException {
-        long id = nextMessageKey++;
+        long id = command.getMessageKey();
         Long previous = locationIndex.put(tx, location, id);
         if( previous == null ) {
             messageIdIndex.put(tx, command.getMessageId(), id);

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=758785&r1=758784&r2=758785&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
Thu Mar 26 18:17:01 2009
@@ -21,10 +21,12 @@
 
 import junit.framework.TestCase;
 
+import org.apache.activemq.broker.store.Store.Callback;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.broker.store.Store.Session;
 import org.apache.activemq.broker.store.Store.VoidCallback;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
 
 public abstract class StoreTestBase extends TestCase {
 
@@ -49,32 +51,16 @@
 
     public void testMessageAdd() throws Exception {
         final MessageRecord expected = new MessageRecord();
-        expected.setBuffer(new AsciiBuffer("buffer"));
+        expected.setBuffer(new Buffer("buffer"));
         expected.setEncoding(new AsciiBuffer("encoding"));
         expected.setMessageId(new AsciiBuffer("1000"));
 
-        store.execute(new VoidCallback<Exception>() {
-            @Override
-            public void run(Session session) throws Exception {
-                Long messageKey = session.messageAdd(expected);
-                MessageRecord actual = session.messageGetRecord(messageKey);
-                assertEquals(expected, actual);
-            }
-        }, null);
-    }
-    
-    public void testMessageAddAndGet() throws Exception {
-        final MessageRecord expected = new MessageRecord();
-        expected.setBuffer(new AsciiBuffer("buffer"));
-        expected.setEncoding(new AsciiBuffer("encoding"));
-        expected.setMessageId(new AsciiBuffer("1000"));
-
-        final Long messageKey = store.execute(new Store.Callback<Long, Exception>()
{
+        final Long messageKey = store.execute(new Callback<Long, Exception>() {
             public Long execute(Session session) throws Exception {
                 return session.messageAdd(expected);
             }
         }, null);
-        
+
         store.execute(new VoidCallback<Exception>() {
             @Override
             public void run(Session session) throws Exception {
@@ -84,7 +70,6 @@
         }, null);
     }
     
-    
     public void testQueueAdd() throws Exception {
         final AsciiBuffer expected = new AsciiBuffer("test");
         store.execute(new VoidCallback<Exception>() {

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java?rev=758785&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java
(added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java
Thu Mar 26 18:17:01 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.kahadb;
+
+import java.io.File;
+
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.StoreTestBase;
+
+public class KahaDBStoreTest extends StoreTestBase {
+
+    @Override
+    protected Store createStore() {
+        KahaDBStore rc = new KahaDBStore();
+        rc.setDirectory(new File("target/test-data/kahadb-store-test"));
+        rc.setDeleteAllMessages(true);
+        return rc;
+    }
+
+    @Override
+    protected boolean isStorePersistent() {
+        return true;
+    }
+
+    @Override
+    protected boolean isStoreTransactional() {
+        return true;
+    }
+
+}



Mime
View raw message