From commits-return-12103-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Sat Oct 24 02:14:58 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 66662 invoked from network); 24 Oct 2009 02:14:58 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 24 Oct 2009 02:14:58 -0000 Received: (qmail 47769 invoked by uid 500); 24 Oct 2009 02:14:58 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 47714 invoked by uid 500); 24 Oct 2009 02:14:58 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 47705 invoked by uid 99); 24 Oct 2009 02:14:58 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 24 Oct 2009 02:14:58 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 24 Oct 2009 02:14:46 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 48FAC2388895; Sat, 24 Oct 2009 02:14:23 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r829298 - /activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ Date: Sat, 24 Oct 2009 02:14:22 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091024021423.48FAC2388895@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Sat Oct 24 02:14:21 2009 New Revision: 829298 URL: http://svn.apache.org/viewvc?rev=829298&view=rev Log: Aextracted out the HawtPageFile inner clasess as prep for implementing more rigorous thread synchronizations. Added: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Batch.java activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/BatchEntry.java activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Commit.java activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/DeferredUpdate.java activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Snapshot.java activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SnapshotHead.java activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Update.java Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFileFactory.java activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java Added: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Batch.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Batch.java?rev=829298&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Batch.java (added) +++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Batch.java Sat Oct 24 02:14:21 2009 @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.page; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Map.Entry; + +import org.apache.activemq.util.list.LinkedNode; +import org.apache.activemq.util.list.LinkedNodeList; +import org.apache.hawtdb.api.Paged; + +/** + * Aggregates a group of commits so that they can be more efficiently + * stored to disk. + * + * @author Hiram Chirino + */ +class Batch extends LinkedNode implements Externalizable, Iterable { + private static final long serialVersionUID = 1188640492489990493L; + + /** the pageId that this redo batch is stored at */ + int page=-1; + /** points to a previous redo batch page */ + public int previous=-1; + /** was the redo loaded in the {@link recover} method */ + boolean recovered; + + /** the commits and snapshots in the redo */ + final LinkedNodeList entries = new LinkedNodeList(); + /** tracks how many snapshots are referencing the redo */ + int references; + /** the oldest commit in this redo */ + public long base=-1; + /** the newest commit in this redo */ + public long head; + + boolean performed; + + public Batch() { + } + + public boolean isPerformed() { + return performed; + } + + public Batch(long head) { + this.head = head; + } + + public String toString() { + return "{ page: "+this.page+", base: "+base+", head: "+head+", references: "+references+", entries: "+entries.size()+" }"; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(head); + out.writeLong(base); + out.writeInt(previous); + + // Only need to store the commits. + ArrayList l = new ArrayList(); + for (Commit commit : this) { + l.add(commit); + } + out.writeObject(l); + } + + @SuppressWarnings("unchecked") + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + head = in.readLong(); + base = in.readLong(); + previous = in.readInt(); + ArrayList l = (ArrayList) in.readObject(); + for (Commit commit : l) { + entries.addLast(commit); + } + } + + public int pageCount() { + int rc = 0; + for (Commit commit : this) { + rc += commit.updates.size(); + } + return rc; + } + + @Override + public Iterator iterator() { + return new Iterator() { + Commit next = nextCommit(entries.getHead()); + Commit last; + + @Override + public boolean hasNext() { + return next!=null; + } + + @Override + public Commit next() { + if( next==null ) { + throw new NoSuchElementException(); + } + last = next; + next = nextCommit(next.getNext()); + return last; + } + + @Override + public void remove() { + if( last==null ) { + throw new IllegalStateException(); + } + last.unlink(); + } + }; + } + + + private Commit nextCommit(BatchEntry entry) { + while( entry != null ) { + Commit commit = entry.isCommit(); + if( commit!=null ) { + return commit; + } + entry = entry.getNext(); + } + return null; + } + + public void performDefferedUpdates(Paged pageFile) { + for (Commit commit : this) { + if( commit.updates != null ) { + for (Entry entry : commit.updates.entrySet()) { + DeferredUpdate du = entry.getValue().deferredUpdate(); + if( du == null ) { + continue; + } + if( du.wasDeferredClear() ) { + List freePages = du.marshaller.remove(pageFile, du.page); + for (Integer page : freePages) { + commit.merge(pageFile.allocator(), page, Update.update(page).freed()); + } + } else if( du.wasDeferredStore() ) { + List allocatedPages = du.store(pageFile); + for (Integer page : allocatedPages) { + // add any allocated pages to the update list so that the free + // list gets properly adjusted. + commit.merge(pageFile.allocator(), page, Update.update(page).allocated()); + } + } + } + } + } + } + + public void freeRedoSpace(SimpleAllocator allocator) { + for (Commit commit : this) { + for (Entry entry : commit.updates.entrySet()) { + int key = entry.getKey(); + Update value = entry.getValue(); + if( value.wasFreed() ) { + allocator.free(key, 1); + } else if( key != value.page ) { + // need to free the udpate page.. + allocator.free(value.page, 1); + } + } + } + } + +} \ No newline at end of file Added: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/BatchEntry.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/BatchEntry.java?rev=829298&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/BatchEntry.java (added) +++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/BatchEntry.java Sat Oct 24 02:14:21 2009 @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.page; + +import org.apache.activemq.util.list.LinkedNode; + +/** + * A BatchEntry is linked list know which may be a Commit record or a SnapshotHead. + * + * @author Hiram Chirino + */ +abstract class BatchEntry extends LinkedNode { + + Commit isCommit() { + return null; + } + + SnapshotHead isSnapshotHead() { + return null; + } +} \ No newline at end of file Added: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Commit.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Commit.java?rev=829298&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Commit.java (added) +++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Commit.java Sat Oct 24 02:14:21 2009 @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.page; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hawtdb.api.Allocator; +import org.apache.hawtdb.api.OptimisticUpdateException; + +/** + * Tracks the updates that were part of a transaction commit. + * Multiple commit objects can be merged into a single commit. + * + * A Commit is a BatchEntry and stored in Batch object. + * + * @author Hiram Chirino + */ +final class Commit extends BatchEntry implements Externalizable { + + /** oldest revision in the commit range. */ + private long base; + /** newest revision in the commit range, will match base if this only tracks one commit */ + private long head; + + /** all the page updates that are part of the redo */ + ConcurrentHashMap updates; + + + public Commit() { + } + + public Commit(long version, ConcurrentHashMap updates) { + this.head = this.base = version; + this.updates = updates; + } + + + @Override + Commit isCommit() { + return this; + } + + + public String toString() { + int updateSize = updates==null ? 0 : updates.size(); + return "{ base: "+this.base+", head: "+this.head+", updates: "+updateSize+" }"; + } + + public long commitCheck(Map newUpdate) { + for (Integer page : newUpdate.keySet()) { + if( updates.containsKey( page ) ) { + throw new OptimisticUpdateException(); + } + } + return head; + } + + public void merge(Allocator allocator, long rev, ConcurrentHashMap updates) { + assert head+1 == rev; + head=rev; + // merge all the entries in the update.. + for (Entry entry : updates.entrySet()) { + merge(allocator, entry.getKey(), entry.getValue()); + } + } + + /** + * merges one update.. + * + * @param page + * @param update + */ + void merge(Allocator allocator, int page, Update update) { + Update previous = this.updates.put(page, update); + if (previous != null) { + + if( update.wasFreed() ) { + // we can undo the previous update + if( previous.page != page ) { + allocator.free(previous.page, 1); + } + if( previous.wasAllocated() ) { + allocator.free(page, 1); + } + this.updates.remove(page); + + // No other merging is needed now.. + return; + } + + // we are undoing the previous update /w this new update. + if( previous.page != page ) { + allocator.free(previous.page, 1); + } + + // we may be updating a previously allocated page, + // if so we need to mark the new page as allocated too. + if( previous.wasAllocated() ) { + update.allocated(); + } + } + } + + @SuppressWarnings("unchecked") + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + base = in.readLong(); + head = in.readLong(); + updates = (ConcurrentHashMap) in.readObject(); + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(base); + out.writeLong(head); + out.writeObject(updates); + } + +} \ No newline at end of file Added: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/DeferredUpdate.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/DeferredUpdate.java?rev=829298&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/DeferredUpdate.java (added) +++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/DeferredUpdate.java Sat Oct 24 02:14:21 2009 @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.page; + +import java.io.ObjectStreamException; +import java.util.List; + +import org.apache.hawtdb.api.EncoderDecoder; +import org.apache.hawtdb.api.Paged; + +/** + * A deferred update is an update which has not yet been performed, but + * which holds onto all the info needed to do the update. + * + * Encoding java objects to do page updates can be CPU intensive, and if + * the same pages are getting updated frequently then deferring updates + * will save encoding passes sine older updates may get discarded due + * to a more more recent update of the same page. + * + * @author Hiram Chirino + */ +class DeferredUpdate extends Update { + EncoderDecoder marshaller; + Object value; + + public DeferredUpdate(Update update) { + super(update); + } + public DeferredUpdate(int page) { + super(page); + } + + public static DeferredUpdate deferred(int page) { + return new DeferredUpdate(page); + } + + public static DeferredUpdate deferred(Update update) { + return new DeferredUpdate(update); + } + + public DeferredUpdate deferredUpdate() { + return this; + } + + public DeferredUpdate store(Object value, EncoderDecoder marshaller) { + this.value = value; + this.marshaller = marshaller; + flags = (byte) ((flags & ~PAGE_CLEAR) | PAGE_STORE); + return this; + } + + public DeferredUpdate clear(EncoderDecoder marshaller) { + this.marshaller= marshaller; + this.value=null; + flags = (byte) ((flags & ~PAGE_STORE) | PAGE_CLEAR); + return this; + } + + @SuppressWarnings("unchecked") + T value() { + return (T) value; + } + + @SuppressWarnings("unchecked") + public List store(Paged paged) { + return ((EncoderDecoder)marshaller).store(paged, page, value); + } + + public Object writeReplace() throws ObjectStreamException { + return new Update(this); + } + +} \ No newline at end of file Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java?rev=829298&r1=829297&r2=829298&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java (original) +++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java Sat Oct 24 02:14:21 2009 @@ -16,20 +16,12 @@ */ package org.apache.hawtdb.internal.page; -import java.io.Externalizable; import java.io.IOException; -import java.io.ObjectInput; import java.io.ObjectInputStream; -import java.io.ObjectOutput; import java.io.ObjectOutputStream; -import java.io.ObjectStreamException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.zip.CRC32; @@ -37,14 +29,9 @@ import javolution.io.Struct; import org.apache.activemq.util.LRUCache; -import org.apache.activemq.util.buffer.Buffer; -import org.apache.activemq.util.list.LinkedNode; import org.apache.activemq.util.list.LinkedNodeList; -import org.apache.hawtdb.api.Allocator; import org.apache.hawtdb.api.EncoderDecoder; import org.apache.hawtdb.api.IOPagingException; -import org.apache.hawtdb.api.OptimisticUpdateException; -import org.apache.hawtdb.api.Paged; import org.apache.hawtdb.api.PagingException; import org.apache.hawtdb.api.Transaction; import org.apache.hawtdb.api.Paged.SliceType; @@ -72,10 +59,8 @@ */ public final class HawtPageFile { - private static final String MAGIC = "HawtDB:1.0\n"; - private static final int FILE_HEADER_SIZE = 1024 * 4; - - public static final int HEADER_SIZE = 1024*4; + public static final int FILE_HEADER_SIZE = 1024 * 4; + public static final String MAGIC = "HawtDB:1.0\n"; /** * The first 4K of the file is used to hold 2 copies of the header. @@ -115,463 +100,18 @@ } } - abstract static class RedoEntry extends LinkedNode { - Commit isCommit() { - return null; - } - SnapshotHead isSnapshotHead() { - return null; - } - } - - /** - * Tracks the page changes that were part of a commit. - * - * Commits can be merged, in that sense this then tracks range of commits. - * - * @author chirino - */ - final static class Commit extends RedoEntry implements Externalizable { - - /** oldest revision in the commit range. */ - private long base; - /** newest revision in the commit range, will match base if this only tracks one commit */ - private long head; - /** all the page updates that are part of the redo */ - private ConcurrentHashMap updates; - - - public Commit() { - } - - public Commit(long version, ConcurrentHashMap updates) { - this.head = this.base = version; - this.updates = updates; - } - - - @Override - Commit isCommit() { - return this; - } - - - public String toString() { - int updateSize = updates==null ? 0 : updates.size(); - return "{ base: "+this.base+", head: "+this.head+", updates: "+updateSize+" }"; - } - - public long commitCheck(Map newUpdate) { - for (Integer page : newUpdate.keySet()) { - if( updates.containsKey( page ) ) { - throw new OptimisticUpdateException(); - } - } - return head; - } - - public void merge(Allocator allocator, long rev, ConcurrentHashMap updates) { - assert head+1 == rev; - head=rev; - // merge all the entries in the update.. - for (Entry entry : updates.entrySet()) { - merge(allocator, entry.getKey(), entry.getValue()); - } - } - - /** - * merges one update.. - * - * @param page - * @param update - */ - private void merge(Allocator allocator, int page, Update update) { - Update previous = this.updates.put(page, update); - if (previous != null) { - - if( update.wasFreed() ) { - // we can undo the previous update - if( previous.page != page ) { - allocator.free(previous.page, 1); - } - if( previous.wasAllocated() ) { - allocator.free(page, 1); - } - this.updates.remove(page); - - // No other merging is needed now.. - return; - } - - // we are undoing the previous update /w this new update. - if( previous.page != page ) { - allocator.free(previous.page, 1); - } - - // we may be updating a previously allocated page, - // if so we need to mark the new page as allocated too. - if( previous.wasAllocated() ) { - update.allocated(); - } - } - } - - @SuppressWarnings("unchecked") - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - base = in.readLong(); - head = in.readLong(); - updates = (ConcurrentHashMap) in.readObject(); - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(base); - out.writeLong(head); - out.writeObject(updates); - } - - } - - final class Snapshot { - final SnapshotHead head; - final Redo base; - - public Snapshot(SnapshotHead head, Redo base) { - this.head = head; - this.base = base; - } - - public Snapshot open() { - head.open(base); - return this; - } - - public void close() { - synchronized(TRANSACTION_MUTEX) { - head.close(base); - } - } - } - - /** - * Provides a snapshot view of the page file. - * - * @author chirino - */ - final class SnapshotHead extends RedoEntry { - final Redo parent; - - public SnapshotHead(Redo parent) { - this.parent = parent; - } - - /** The number of times this snapshot has been opened. */ - protected int references; - - public String toString() { - return "{ references: "+this.references+" }"; - } - - SnapshotHead isSnapshotHead() { - return this; - } - - public void read(int pageId, Buffer buffer) throws IOPagingException { - pageId = mapPageId(pageId); - pageFile.read(pageId, buffer); - } - - public ByteBuffer slice(int pageId, int count) { - pageId = mapPageId(pageId); - return pageFile.slice(SliceType.READ, pageId, count); - } - - public void open(Redo base) { - references++; - while( true ) { - base.references++; - if(base == parent ) { - break; - } - base = base.getNext(); - } - } - - public void close(Redo base) { - references--; - while( true ) { - base.references--; - if(base == parent ) { - break; - } - base = base.getNext(); - } - - if( references==0 ) { - unlink(); - // TODO: trigger merging of adjacent commits. - } - } - - public int mapPageId(int page) { - // Look for the page in the previous commits.. - Redo curRedo = parent; - RedoEntry curEntry = getPrevious(); - while( true ) { - if( curRedo.isPerformed() ) { - break; - } - - while( curEntry!=null ) { - Commit commit = curEntry.isCommit(); - if( commit !=null ) { - Update update = commit.updates.get(page); - if( update!=null ) { - return update.page(); - } - } - curEntry = curEntry.getPrevious(); - } - - curRedo = curRedo.getPrevious(); - if( curRedo==null ) { - break; - } - curEntry = curRedo.entries.getTail(); - } - return page; - } - - - public T cacheLoad(EncoderDecoder marshaller, int page) { - Redo curRedo = parent; - RedoEntry curEntry = getPrevious(); - while( true ) { - if( curRedo.isPerformed() ) { - break; - } - - while( curEntry!=null ) { - Commit commit = curEntry.isCommit(); - if( commit !=null ) { - Update update = commit.updates.get(page); - if( update!=null ) { - DeferredUpdate du = update.deferredUpdate(); - if (du!=null) { - return du.value(); - } - } - } - curEntry = curEntry.getPrevious(); - } - - curRedo = curRedo.getPrevious(); - if( curRedo==null ) { - break; - } - curEntry = curRedo.entries.getTail(); - } - return readCache.cacheLoad(marshaller, page); - } - - public long commitCheck(Map pageUpdates) { - long rc=0; - Redo curRedo = parent; - RedoEntry curEntry = getNext(); - while( true ) { - while( curEntry!=null ) { - Commit commit = curEntry.isCommit(); - if( commit!=null ) { - rc = commit.commitCheck(pageUpdates); - } - curEntry = curEntry.getNext(); - } - - curRedo = curRedo.getNext(); - if( curRedo==null ) { - break; - } - curEntry = curRedo.entries.getHead(); - } - return rc; - } - - } - - /** - * Aggregates a group of commits so that they can be more efficiently operated against. - * - */ - static class Redo extends LinkedNode implements Externalizable, Iterable { - private static final long serialVersionUID = 1188640492489990493L; - - /** the pageId that this redo batch is stored at */ - private int page=-1; - /** points to a previous redo batch page */ - public int previous=-1; - /** was the redo loaded in the {@link recover} method */ - private boolean recovered; - - /** the commits and snapshots in the redo */ - private final LinkedNodeList entries = new LinkedNodeList(); - /** tracks how many snapshots are referencing the redo */ - private int references; - /** the oldest commit in this redo */ - public long base=-1; - /** the newest commit in this redo */ - public long head; - - private boolean performed; - - public Redo() { - } - - public boolean isPerformed() { - return performed; - } - - public Redo(long head) { - this.head = head; - } - - public String toString() { - return "{ page: "+this.page+", base: "+base+", head: "+head+", references: "+references+", entries: "+entries.size()+" }"; - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(head); - out.writeLong(base); - out.writeInt(previous); - - // Only need to store the commits. - ArrayList l = new ArrayList(); - for (Commit commit : this) { - l.add(commit); - } - out.writeObject(l); - } - - @SuppressWarnings("unchecked") - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - head = in.readLong(); - base = in.readLong(); - previous = in.readInt(); - ArrayList l = (ArrayList) in.readObject(); - for (Commit commit : l) { - entries.addLast(commit); - } - } - - public int pageCount() { - int rc = 0; - for (Commit commit : this) { - rc += commit.updates.size(); - } - return rc; - } - - @Override - public Iterator iterator() { - return new Iterator() { - Commit next = nextCommit(entries.getHead()); - Commit last; - - @Override - public boolean hasNext() { - return next!=null; - } - - @Override - public Commit next() { - if( next==null ) { - throw new NoSuchElementException(); - } - last = next; - next = nextCommit(next.getNext()); - return last; - } - - @Override - public void remove() { - if( last==null ) { - throw new IllegalStateException(); - } - last.unlink(); - } - }; - } - - - private Commit nextCommit(RedoEntry entry) { - while( entry != null ) { - Commit commit = entry.isCommit(); - if( commit!=null ) { - return commit; - } - entry = entry.getNext(); - } - return null; - } - - public void performDefferedUpdates(Paged pageFile) { - for (Commit commit : this) { - if( commit.updates != null ) { - for (Entry entry : commit.updates.entrySet()) { - DeferredUpdate du = entry.getValue().deferredUpdate(); - if( du == null ) { - continue; - } - if( du.wasDeferredClear() ) { - List freePages = du.marshaller.remove(pageFile, du.page); - for (Integer page : freePages) { - commit.merge(pageFile.allocator(), page, Update.update(page).freed()); - } - } else if( du.wasDeferredStore() ) { - List allocatedPages = du.store(pageFile); - for (Integer page : allocatedPages) { - // add any allocated pages to the update list so that the free - // list gets properly adjusted. - commit.merge(pageFile.allocator(), page, Update.update(page).allocated()); - } - } - } - } - } - } - - public void freeRedoSpace(SimpleAllocator allocator) { - for (Commit commit : this) { - for (Entry entry : commit.updates.entrySet()) { - int key = entry.getKey(); - Update value = entry.getValue(); - if( value.wasFreed() ) { - allocator.free(key, 1); - } else if( key != value.page ) { - // need to free the udpate page.. - allocator.free(value.page, 1); - } - } - } - } - - } - private final MemoryMappedFile file; final SimpleAllocator allocator; final PageFile pageFile; private static final int updateBatchSize = 1024; private final boolean synch; - /** The header structure of the file */ private final Header header = new Header(); int lastRedoPage = -1; - private final LinkedNodeList redos = new LinkedNodeList(); + private final LinkedNodeList redos = new LinkedNodeList(); // // The following Redo objects point to linked nodes in the previous redo list. @@ -579,22 +119,22 @@ // /** The current redo that is currently being built */ - Redo buildingRedo; + Batch buildingRedo; /** The stored redos. These might be be recoverable. */ - Redo storedRedos; + Batch storedRedos; /** The synced redos. A file sync occurred after these redos were stored. */ - Redo syncedRedos; + Batch syncedRedos; /** The performed redos. Updates are actually performed to the original page file. */ - Redo performedRedos; + Batch performedRedos; /** Used as read cache */ - private ReadCache readCache = new ReadCache(); + ReadCache readCache = new ReadCache(); /** Mutex for data structures which are used during house keeping tasks like redo management. Once acquired, you can also acquire the TRANSACTION_MUTEX */ private final Object HOUSE_KEEPING_MUTEX = "HOUSE_KEEPING_MUTEX"; /** Mutex for data structures which transaction threads access. Never attempt to acquire the HOUSE_KEEPING_MUTEX once this mutex is acquired. */ - private final Object TRANSACTION_MUTEX = "TRANSACTION_MUTEX"; + final Object TRANSACTION_MUTEX = "TRANSACTION_MUTEX"; /** @@ -638,10 +178,10 @@ * @param to * @return string representation of the redo items from the specified redo up to (exclusive) the specified redo. */ - private String toString(Redo from, Redo to) { + private String toString(Batch from, Batch to) { StringBuilder rc = new StringBuilder(); rc.append("[ "); - Redo t = from; + Batch t = from; while( t!=null && t!=to ) { if( t!=from ) { rc.append(", "); @@ -679,7 +219,7 @@ // Note: every deferred update has an entry in the pageUpdates, so no need to // check to see if that map also conflicts. - rev = snapshot.head.commitCheck(pageUpdates); + rev = snapshot.getHead().commitCheck(pageUpdates); snapshot.close(); } else { rev = buildingRedo.head; @@ -692,7 +232,7 @@ buildingRedo.head = rev; Commit commit=null; - RedoEntry last = buildingRedo.entries.getTail(); + BatchEntry last = buildingRedo.entries.getTail(); if( last!=null ) { commit = last.isCommit(); } @@ -726,7 +266,7 @@ public void reset() { synchronized (HOUSE_KEEPING_MUTEX) { redos.clear(); - performedRedos = syncedRedos = storedRedos = buildingRedo = new Redo(-1); + performedRedos = syncedRedos = storedRedos = buildingRedo = new Batch(-1); redos.addFirst(buildingRedo); lastRedoPage = -1; @@ -756,7 +296,7 @@ synchronized (HOUSE_KEEPING_MUTEX) { redos.clear(); - performedRedos = syncedRedos = storedRedos = buildingRedo = new Redo(-1); + performedRedos = syncedRedos = storedRedos = buildingRedo = new Batch(-1); redos.addFirst(buildingRedo); lastRedoPage = -1; readCache.clear(); @@ -797,7 +337,7 @@ } - Redo redo = loadObject(pageId); + Batch redo = loadObject(pageId); redo.page = pageId; redo.recovered = true; Extent.unfree(pageFile, pageId); @@ -851,14 +391,14 @@ * Attempts to perform a redo state change: building -> stored */ private void storeRedos(boolean force) { - Redo redo; + Batch redo; // We synchronized /w the transactions so that they see the state change. synchronized (TRANSACTION_MUTEX) { // Re-checking since storing the redo may not be needed. if( (force && buildingRedo.base!=-1 ) || buildingRedo.pageCount() > updateBatchSize ) { redo = buildingRedo; - buildingRedo = new Redo(redo.head); + buildingRedo = new Batch(redo.head); redos.addLast(buildingRedo); } else { return; @@ -898,7 +438,7 @@ // Update the base_revision with the last performed revision. if (performedRedos!=syncedRedos) { - Redo lastPerformedRedo = syncedRedos.getPrevious(); + Batch lastPerformedRedo = syncedRedos.getPrevious(); h.base_revision.set(lastPerformedRedo.head); } @@ -906,7 +446,7 @@ if (storedRedos!=buildingRedo) { // The last stored is actually synced now.. - Redo lastStoredRedo = buildingRedo.getPrevious(); + Batch lastStoredRedo = buildingRedo.getPrevious(); // Let the header know about it.. h.redo_page.set(lastStoredRedo.page); @@ -964,7 +504,7 @@ // The last performed redo MIGHT still have an open snapshot. // we can't transition from synced, until that snapshot closes. - Redo lastPerformed = syncedRedos.getPrevious(); + Batch lastPerformed = syncedRedos.getPrevious(); if( lastPerformed!=null && lastPerformed.references!=0) { return; } @@ -1038,19 +578,19 @@ SnapshotHead head=null; // re-use the last entry if it was a snapshot head.. - RedoEntry entry = buildingRedo.entries.getTail(); + BatchEntry entry = buildingRedo.entries.getTail(); if( entry!=null ) { head = entry.isSnapshotHead(); } if( head == null ) { // create a new snapshot head entry.. - head = new SnapshotHead(buildingRedo); + head = new SnapshotHead(this, buildingRedo); buildingRedo.entries.addLast(head); } // Open the snapshot off that head position. - return new Snapshot(head, syncedRedos).open(); + return new Snapshot(this, head, syncedRedos).open(); } } @@ -1136,8 +676,7 @@ final class ReadCache { private final Map map = Collections.synchronizedMap(new LRUCache(1024)); - @SuppressWarnings("unchecked") - private T cacheLoad(EncoderDecoder marshaller, int pageId) { + @SuppressWarnings("unchecked") T cacheLoad(EncoderDecoder marshaller, int pageId) { T rc = (T) map.get(pageId); if( rc ==null ) { rc = marshaller.load(pageFile, pageId); @@ -1149,144 +688,5 @@ public void clear() { map.clear(); } - } - - public static final byte PAGE_ALLOCATED = 0x01 << 0; - public static final byte PAGE_FREED = 0x01 << 1; - public static final byte PAGE_STORE = 0x01 << 2; - public static final byte PAGE_CLEAR = 0x01 << 3; - - static class Update implements Externalizable { - - private static final long serialVersionUID = -1128410792448869134L; - - byte flags; - int page; - - public Update() { - } - - public Update(Update udpate) { - this.page = udpate.page; - this.flags = (byte) (udpate.flags & (PAGE_ALLOCATED|PAGE_FREED)); - } - - public Update(int page) { - this.page = page; - } - - public static Update update(Update update) { - return new Update(update); - } - public static Update update(int page) { - return new Update(page); - } - - public int page() { - if( wasFreed() ) { - throw new PagingException("You should never try to read or write page that has been freed."); - } - return page; - } - - public DeferredUpdate deferredUpdate() { - return null; - } - - public Update allocated() { - flags = (byte) ((flags & ~PAGE_FREED) | PAGE_ALLOCATED); - return this; - } - - public Update freed() { - flags = (byte) ((flags & ~PAGE_ALLOCATED) | PAGE_FREED); - return this; - } - - public boolean wasFreed() { - return (flags & PAGE_FREED)!=0 ; - } - public boolean wasAllocated() { - return (flags & PAGE_ALLOCATED)!=0; - } - public boolean wasDeferredStore() { - return (flags & PAGE_STORE)!=0 ; - } - public boolean wasDeferredClear() { - return (flags & PAGE_CLEAR)!=0; - } - - @Override - public String toString() { - return "{ page: "+page+", flags: "+flags+" }"; - } - - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - page = in.readInt(); - flags = in.readByte(); - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(page); - out.writeByte(flags); - } - - } - - - @SuppressWarnings("serial") - static class DeferredUpdate extends Update { - EncoderDecoder marshaller; - Object value; - - public DeferredUpdate(Update update) { - super(update); - } - public DeferredUpdate(int page) { - super(page); - } - - public static DeferredUpdate deferred(int page) { - return new DeferredUpdate(page); - } - - public static DeferredUpdate deferred(Update update) { - return new DeferredUpdate(update); - } - - public DeferredUpdate deferredUpdate() { - return this; - } - - public DeferredUpdate store(Object value, EncoderDecoder marshaller) { - this.value = value; - this.marshaller = marshaller; - flags = (byte) ((flags & ~PAGE_CLEAR) | PAGE_STORE); - return this; - } - - public DeferredUpdate clear(EncoderDecoder marshaller) { - this.marshaller= marshaller; - this.value=null; - flags = (byte) ((flags & ~PAGE_STORE) | PAGE_CLEAR); - return this; - } - - @SuppressWarnings("unchecked") - T value() { - return (T) value; - } - - @SuppressWarnings("unchecked") - public List store(Paged paged) { - return ((EncoderDecoder)marshaller).store(paged, page, value); - } - - public Object writeReplace() throws ObjectStreamException { - return new Update(this); - } - } } Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFileFactory.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFileFactory.java?rev=829298&r1=829297&r2=829298&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFileFactory.java (original) +++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFileFactory.java Sat Oct 24 02:14:21 2009 @@ -32,7 +32,7 @@ } public HawtPageFileFactory() { - super.setHeaderSize(HawtPageFile.HEADER_SIZE); + super.setHeaderSize(HawtPageFile.FILE_HEADER_SIZE); } @Override Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java?rev=829298&r1=829297&r2=829298&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java (original) +++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java Sat Oct 24 02:14:21 2009 @@ -16,9 +16,6 @@ */ package org.apache.hawtdb.internal.page; -import static org.apache.hawtdb.internal.page.HawtPageFile.DeferredUpdate.deferred; -import static org.apache.hawtdb.internal.page.HawtPageFile.Update.update; - import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; @@ -30,9 +27,10 @@ import org.apache.hawtdb.api.OutOfSpaceException; import org.apache.hawtdb.api.PagingException; import org.apache.hawtdb.api.Transaction; -import org.apache.hawtdb.internal.page.HawtPageFile.DeferredUpdate; -import org.apache.hawtdb.internal.page.HawtPageFile.Snapshot; -import org.apache.hawtdb.internal.page.HawtPageFile.Update; + +import static org.apache.hawtdb.internal.page.Update.*; +import static org.apache.hawtdb.internal.page.DeferredUpdate.*; + /** * Transaction objects are NOT thread safe. Users of this object should * guard it from concurrent access. @@ -110,7 +108,7 @@ } // No? Then ask the snapshot to load the object. - return snapshot().head.cacheLoad(marshaller, page); + return snapshot().getHead().cacheLoad(marshaller, page); } public void put(EncoderDecoder marshaller, int page, T value) { @@ -166,7 +164,7 @@ parent.pageFile.read(update.page(), buffer); } else { // Get the data from the snapshot. - snapshot().head.read(pageId, buffer); + snapshot().getHead().read(pageId, buffer); } } @@ -178,7 +176,7 @@ return parent.pageFile.slice(type, udpate.page(), count); } else { // Get the data from the snapshot. - return snapshot().head.slice(page, count); + return snapshot().getHead().slice(page, count); } } else { @@ -186,7 +184,7 @@ if (update == null) { update = update(parent.allocator.alloc(count)).allocated(); if (type==SliceType.READ_WRITE) { - ByteBuffer slice = snapshot().head.slice(page, count); + ByteBuffer slice = snapshot().getHead().slice(page, count); try { parent.pageFile.write(update.page, slice); } finally { Added: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Snapshot.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Snapshot.java?rev=829298&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Snapshot.java (added) +++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Snapshot.java Sat Oct 24 02:14:21 2009 @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.page; + +/** + * + * @author Hiram Chirino + */ +final class Snapshot { + + private final HawtPageFile parent; + private final SnapshotHead head; + private final Batch base; + + Snapshot(HawtPageFile hawtPageFile, SnapshotHead head, Batch base) { + parent = hawtPageFile; + this.head = head; + this.base = base; + } + + Snapshot open() { + head.open(base); + return this; + } + + void close() { + synchronized(parent.TRANSACTION_MUTEX) { + head.close(base); + } + } + + SnapshotHead getHead() { + return head; + } +} \ No newline at end of file Added: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SnapshotHead.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SnapshotHead.java?rev=829298&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SnapshotHead.java (added) +++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SnapshotHead.java Sat Oct 24 02:14:21 2009 @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.page; + +import java.nio.ByteBuffer; +import java.util.Map; + +import org.apache.activemq.util.buffer.Buffer; +import org.apache.hawtdb.api.EncoderDecoder; +import org.apache.hawtdb.api.IOPagingException; +import org.apache.hawtdb.api.Paged.SliceType; + +/** + * + * A SnapshotHead is BatchEntry and stored in a Batch, in the same + * list as the Commit objects. It's main purpose is to separate + * commits so that Commits after snapshot do not get merged with commits + * before the snapshot. + * + * A SnapshotHead allows transactions to get a point in time view of the + * page file. + * + * @author Hiram Chirino + */ +final class SnapshotHead extends BatchEntry { + /** + * + */ + private final HawtPageFile hawtPageFile; + final Batch parent; + + public SnapshotHead(HawtPageFile hawtPageFile, Batch parent) { + this.hawtPageFile = hawtPageFile; + this.parent = parent; + } + + /** The number of times this snapshot has been opened. */ + protected int references; + + public String toString() { + return "{ references: "+this.references+" }"; + } + + SnapshotHead isSnapshotHead() { + return this; + } + + public void read(int pageId, Buffer buffer) throws IOPagingException { + pageId = mapPageId(pageId); + this.hawtPageFile.pageFile.read(pageId, buffer); + } + + public ByteBuffer slice(int pageId, int count) { + pageId = mapPageId(pageId); + return this.hawtPageFile.pageFile.slice(SliceType.READ, pageId, count); + } + + public void open(Batch base) { + references++; + while( true ) { + base.references++; + if(base == parent ) { + break; + } + base = base.getNext(); + } + } + + public void close(Batch base) { + references--; + while( true ) { + base.references--; + if(base == parent ) { + break; + } + base = base.getNext(); + } + + if( references==0 ) { + unlink(); + // TODO: trigger merging of adjacent commits. + } + } + + public int mapPageId(int page) { + // Look for the page in the previous commits.. + Batch curRedo = parent; + BatchEntry curEntry = getPrevious(); + while( true ) { + if( curRedo.isPerformed() ) { + break; + } + + while( curEntry!=null ) { + Commit commit = curEntry.isCommit(); + if( commit !=null ) { + Update update = commit.updates.get(page); + if( update!=null ) { + return update.page(); + } + } + curEntry = curEntry.getPrevious(); + } + + curRedo = curRedo.getPrevious(); + if( curRedo==null ) { + break; + } + curEntry = curRedo.entries.getTail(); + } + return page; + } + + + public T cacheLoad(EncoderDecoder marshaller, int page) { + Batch curRedo = parent; + BatchEntry curEntry = getPrevious(); + while( true ) { + if( curRedo.isPerformed() ) { + break; + } + + while( curEntry!=null ) { + Commit commit = curEntry.isCommit(); + if( commit !=null ) { + Update update = commit.updates.get(page); + if( update!=null ) { + DeferredUpdate du = update.deferredUpdate(); + if (du!=null) { + return du.value(); + } + } + } + curEntry = curEntry.getPrevious(); + } + + curRedo = curRedo.getPrevious(); + if( curRedo==null ) { + break; + } + curEntry = curRedo.entries.getTail(); + } + return this.hawtPageFile.readCache.cacheLoad(marshaller, page); + } + + public long commitCheck(Map pageUpdates) { + long rc=0; + Batch curRedo = parent; + BatchEntry curEntry = getNext(); + while( true ) { + while( curEntry!=null ) { + Commit commit = curEntry.isCommit(); + if( commit!=null ) { + rc = commit.commitCheck(pageUpdates); + } + curEntry = curEntry.getNext(); + } + + curRedo = curRedo.getNext(); + if( curRedo==null ) { + break; + } + curEntry = curRedo.entries.getHead(); + } + return rc; + } + +} \ No newline at end of file Added: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Update.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Update.java?rev=829298&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Update.java (added) +++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Update.java Sat Oct 24 02:14:21 2009 @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hawtdb.internal.page; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +import org.apache.hawtdb.api.PagingException; + +/** + * + * @author Hiram Chirino + */ +class Update implements Externalizable { + + public static final byte PAGE_ALLOCATED = 0x01 << 0; + public static final byte PAGE_FREED = 0x01 << 1; + public static final byte PAGE_STORE = 0x01 << 2; + public static final byte PAGE_CLEAR = 0x01 << 3; + + private static final long serialVersionUID = -1128410792448869134L; + + byte flags; + int page; + + public Update() { + } + + public Update(Update udpate) { + this.page = udpate.page; + this.flags = (byte) (udpate.flags & (PAGE_ALLOCATED|PAGE_FREED)); + } + + public Update(int page) { + this.page = page; + } + + public static Update update(Update update) { + return new Update(update); + } + public static Update update(int page) { + return new Update(page); + } + + public int page() { + if( wasFreed() ) { + throw new PagingException("You should never try to read or write page that has been freed."); + } + return page; + } + + public DeferredUpdate deferredUpdate() { + return null; + } + + public Update allocated() { + flags = (byte) ((flags & ~PAGE_FREED) | PAGE_ALLOCATED); + return this; + } + + public Update freed() { + flags = (byte) ((flags & ~PAGE_ALLOCATED) | PAGE_FREED); + return this; + } + + public boolean wasFreed() { + return (flags & PAGE_FREED)!=0 ; + } + public boolean wasAllocated() { + return (flags & PAGE_ALLOCATED)!=0; + } + public boolean wasDeferredStore() { + return (flags & PAGE_STORE)!=0 ; + } + public boolean wasDeferredClear() { + return (flags & PAGE_CLEAR)!=0; + } + + @Override + public String toString() { + return "{ page: "+page+", flags: "+flags+" }"; + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + page = in.readInt(); + flags = in.readByte(); + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(page); + out.writeByte(flags); + } + +} \ No newline at end of file