Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 5309 invoked from network); 25 Nov 2006 06:01:52 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 25 Nov 2006 06:01:52 -0000 Received: (qmail 38244 invoked by uid 500); 25 Nov 2006 06:02:02 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 38226 invoked by uid 500); 25 Nov 2006 06:02:02 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 38215 invoked by uid 99); 25 Nov 2006 06:02:01 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Nov 2006 22:02:01 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Nov 2006 22:01:49 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id BCF6C1A9846; Fri, 24 Nov 2006 22:00:58 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r479089 [1/2] - in /incubator/activemq/trunk: ./ activemq-core/ activemq-core/src/main/java/org/apache/activemq/kaha/impl/ activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ activemq-core/src/main/java/org/apache/activemq/kaha... Date: Sat, 25 Nov 2006 06:00:57 -0000 To: activemq-commits@geronimo.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061125060058.BCF6C1A9846@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Fri Nov 24 22:00:56 2006 New Revision: 479089 URL: http://svn.apache.org/viewvc?view=rev&rev=479089 Log: Added a new org.apache.activemq.kaha.impl.asyc package that holds data manager/journal that implements both the Kaha DataManager and ActiveIO Journal interfaces. - Initial bench marks show it to be as fast or faster than the default ActiveIO Journal. - The bigest differentiator is that this implementation of the journal was built to also provide fast reads. - The DataManager interface was extracted and now the KahaStore can switch between the original DataManager implementation and the new implementation in the kaha.impl.async packagge. - Simplified the original implementation by removing the AsyncDataWriters stuff since this is largely what the new package is based on. Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java (with props) incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java - copied, changed from r477680, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalImplTest.java incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalPerfTool.java incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalRWPerfTool.java incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/LocationTest.java incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/LinkedNodeTest.java Removed: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileReader.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileWriter.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java Modified: incubator/activemq/trunk/activemq-core/pom.xml incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java incubator/activemq/trunk/pom.xml Modified: incubator/activemq/trunk/activemq-core/pom.xml URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/pom.xml?view=diff&rev=479089&r1=479088&r2=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/pom.xml (original) +++ incubator/activemq/trunk/activemq-core/pom.xml Fri Nov 24 22:00:56 2006 @@ -48,6 +48,12 @@ activeio-core false + + ${pom.groupId} + activeio-core + false + test-jar + org.apache.geronimo.specs Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java?view=auto&rev=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java Fri Nov 24 22:00:56 2006 @@ -0,0 +1,42 @@ +package org.apache.activemq.kaha.impl; + +import java.io.IOException; + +import org.apache.activemq.kaha.Marshaller; +import org.apache.activemq.kaha.StoreLocation; +import org.apache.activemq.kaha.impl.data.RedoListener; + +public interface DataManager { + + String getName(); + + Object readItem(Marshaller marshaller, StoreLocation item) + throws IOException; + + StoreLocation storeDataItem(Marshaller marshaller, Object payload) + throws IOException; + + StoreLocation storeRedoItem(Object payload) throws IOException; + + void updateItem(StoreLocation location, Marshaller marshaller, + Object payload) throws IOException; + + void recoverRedoItems(RedoListener listener) throws IOException; + + void close() throws IOException; + + void force() throws IOException; + + boolean delete() throws IOException; + + void addInterestInFile(int file) throws IOException; + + void removeInterestInFile(int file) throws IOException; + + void consolidateDataFiles() throws IOException; + + Marshaller getRedoMarshaller(); + + void setRedoMarshaller(Marshaller redoMarshaller); + +} \ No newline at end of file Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java?view=diff&rev=479089&r1=479088&r2=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java Fri Nov 24 22:00:56 2006 @@ -27,7 +27,6 @@ import org.apache.activemq.kaha.StoreEntry; import org.apache.activemq.kaha.StoreLocation; import org.apache.activemq.kaha.impl.container.ContainerId; -import org.apache.activemq.kaha.impl.data.DataManager; import org.apache.activemq.kaha.impl.data.Item; import org.apache.activemq.kaha.impl.index.IndexItem; import org.apache.activemq.kaha.impl.index.IndexManager; Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?view=diff&rev=479089&r1=479088&r2=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Fri Nov 24 22:00:56 2006 @@ -33,11 +33,13 @@ import org.apache.activemq.kaha.RuntimeStoreException; import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.StoreLocation; +import org.apache.activemq.kaha.impl.async.AsyncDataManager; +import org.apache.activemq.kaha.impl.async.DataManagerFacade; import org.apache.activemq.kaha.impl.container.BaseContainerImpl; import org.apache.activemq.kaha.impl.container.ContainerId; import org.apache.activemq.kaha.impl.container.ListContainerImpl; import org.apache.activemq.kaha.impl.container.MapContainerImpl; -import org.apache.activemq.kaha.impl.data.DataManager; +import org.apache.activemq.kaha.impl.data.DataManagerImpl; import org.apache.activemq.kaha.impl.data.Item; import org.apache.activemq.kaha.impl.data.RedoListener; import org.apache.activemq.kaha.impl.index.IndexItem; @@ -73,8 +75,8 @@ private String mode; private boolean initialized; private boolean logIndexChanges=false; - private boolean useAsyncWriter=false; - private long maxDataFileLength=DataManager.MAX_FILE_LENGTH; + private boolean useAsyncDataManager=false; + private long maxDataFileLength=1024*1024*32; private FileLock lock; private String indexType=IndexTypes.DISK_INDEX; @@ -319,10 +321,21 @@ public synchronized DataManager getDataManager(String name) throws IOException{ DataManager dm=(DataManager)dataManagers.get(name); if(dm==null){ - dm=new DataManager(directory,name); - dm.setMaxFileLength(maxDataFileLength); - dm.setUseAsyncWriter(isUseAsyncWriter()); - recover(dm); + if( isUseAsyncDataManager() ) { + AsyncDataManager t=new AsyncDataManager(); + t.setDirectory(directory); + t.setFilePrefix("data-"+name+"-"); + t.setMaxFileLength((int) maxDataFileLength); + t.start(); + dm=new DataManagerFacade(t, name); + } else { + DataManagerImpl t=new DataManagerImpl(directory,name); + t.setMaxFileLength(maxDataFileLength); + dm=t; + } + if( logIndexChanges ) { + recover(dm); + } dataManagers.put(name,dm); } return dm; @@ -339,7 +352,6 @@ private void recover(final DataManager dm) throws IOException{ dm.recoverRedoItems(new RedoListener(){ - public void onRedoItem(StoreLocation item,Object o) throws Exception{ RedoStoreIndexItem redo=(RedoStoreIndexItem)o; // IndexManager im = getIndexManager(dm, redo.getIndexName()); @@ -531,12 +543,12 @@ } } - public synchronized boolean isUseAsyncWriter() { - return useAsyncWriter; + public synchronized boolean isUseAsyncDataManager() { + return useAsyncDataManager; } - public synchronized void setUseAsyncWriter(boolean useAsyncWriter) { - this.useAsyncWriter = useAsyncWriter; + public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) { + this.useAsyncDataManager = useAsyncWriter; } Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?view=auto&rev=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Fri Nov 24 22:00:56 2006 @@ -0,0 +1,481 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.async; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand; +import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey; +import org.apache.activemq.util.ByteSequence; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Manages DataFiles + * + * @version $Revision: 1.1.1.1 $ + */ +public final class AsyncDataManager { + + private static final Log log=LogFactory.getLog(AsyncDataManager.class); + + public static int CONTROL_RECORD_MAX_LENGTH=1024; + + public static final int ITEM_HEAD_RESERVED_SPACE=21; + // ITEM_HEAD_SPACE = length + type+ reserved space + SOR + public static final int ITEM_HEAD_SPACE=4+1+ITEM_HEAD_RESERVED_SPACE+3; + public static final int ITEM_HEAD_OFFSET_TO_SOR=ITEM_HEAD_SPACE-3; + public static final int ITEM_FOOT_SPACE=3; // EOR + + public static final int ITEM_HEAD_FOOT_SPACE=ITEM_HEAD_SPACE+ITEM_FOOT_SPACE; + + public static final byte[] ITEM_HEAD_SOR=new byte[]{'S', 'O', 'R'}; // + public static final byte[] ITEM_HEAD_EOR=new byte[]{'E', 'O', 'R'}; // + + public static final byte DATA_ITEM_TYPE=1; + public static final byte REDO_ITEM_TYPE=2; + + public static String DEFAULT_DIRECTORY="data"; + public static String DEFAULT_FILE_PREFIX="data-"; + public static int DEFAULT_MAX_FILE_LENGTH=1024*1024*32; + + private File directory = new File(DEFAULT_DIRECTORY); + private String filePrefix=DEFAULT_FILE_PREFIX; + private int maxFileLength = DEFAULT_MAX_FILE_LENGTH; + private int preferedFileLength = DEFAULT_MAX_FILE_LENGTH-1024*512; + + private DataFileAppender appender; + private DataFileAccessorPool accessorPool = new DataFileAccessorPool(this); + + private Map fileMap=new HashMap(); + private DataFile currentWriteFile; + ControlFile controlFile; + + private Location mark; + private final AtomicReference lastAppendLocation = new AtomicReference(); + boolean started = false; + boolean useNio = true; + + protected final ConcurrentHashMap inflightWrites = new ConcurrentHashMap(); + + @SuppressWarnings("unchecked") + public synchronized void start() throws IOException { + if( started ) { + return; + + } + + started=true; + directory.mkdirs(); + controlFile = new ControlFile(new File(directory, filePrefix+"control"), CONTROL_RECORD_MAX_LENGTH); + controlFile.lock(); + + ByteSequence sequence = controlFile.load(); + if( sequence != null && sequence.getLength()>0 ) { + unmarshallState(sequence); + } + if( useNio) { + appender = new NIODataFileAppender(this); + } else { + appender = new DataFileAppender(this); + } + + File[] files=directory.listFiles(new FilenameFilter(){ + public boolean accept(File dir,String n){ + return dir.equals(dir)&&n.startsWith(filePrefix); + } + }); + + if(files!=null){ + for(int i=0;i l = new ArrayList(fileMap.values()); + Collections.sort(l); + currentWriteFile=null; + for (DataFile df : l) { + if( currentWriteFile!=null ) { + currentWriteFile.linkAfter(df); + } + currentWriteFile=df; + } + } + + // Need to check the current Write File to see if there was a partial write to it. + if( currentWriteFile!=null ) { + + // See if the lastSyncedLocation is valid.. + Location l = lastAppendLocation.get(); + if( l!=null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue() ) { + l=null; + } + + // If we know the last location that was ok.. then we can skip lots of checking + l = recoveryCheck(currentWriteFile, l); + lastAppendLocation.set(l); + } + + storeState(false); + } + + private Location recoveryCheck(DataFile dataFile, Location location) throws IOException { + if( location == null ) { + location = new Location(); + location.setDataFileId(dataFile.getDataFileId()); + location.setOffset(0); + } + DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); + try { + reader.readLocationDetails(location); + while( reader.readLocationDetailsAndValidate(location) ) { + location.setOffset(location.getOffset()+location.getSize()); + } + } finally { + accessorPool.closeDataFileAccessor(reader); + } + dataFile.setLength(location.getOffset()); + return location; + } + + private void unmarshallState(ByteSequence sequence) throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength()); + DataInputStream dis = new DataInputStream(bais); + if( dis.readBoolean() ) { + mark = new Location(); + mark.readExternal(dis); + } else { + mark = null; + } + if( dis.readBoolean() ) { + Location l = new Location(); + l.readExternal(dis); + lastAppendLocation.set(l); + } else { + lastAppendLocation.set(null); + } + } + + private ByteSequence marshallState() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + + if( mark!=null ) { + dos.writeBoolean(true); + mark.writeExternal(dos); + } else { + dos.writeBoolean(false); + } + Location l = lastAppendLocation.get(); + if( l!=null ) { + dos.writeBoolean(true); + l.writeExternal(dos); + } else { + dos.writeBoolean(false); + } + + byte[] bs = baos.toByteArray(); + return new ByteSequence(bs,0,bs.length); + } + + synchronized DataFile allocateLocation(Location location) throws IOException{ + if(currentWriteFile==null||((currentWriteFile.getLength()+location.getSize())>maxFileLength)){ + int nextNum=currentWriteFile!=null?currentWriteFile.getDataFileId().intValue()+1:1; + + String fileName=filePrefix+nextNum; + DataFile nextWriteFile=new DataFile(new File(directory,fileName),nextNum, preferedFileLength); + fileMap.put(nextWriteFile.getDataFileId(),nextWriteFile); + if( currentWriteFile!=null ) { + currentWriteFile.linkAfter(nextWriteFile); + if(currentWriteFile.isUnused()){ + removeDataFile(currentWriteFile); + } + } + currentWriteFile=nextWriteFile; + + } + location.setOffset(currentWriteFile.getLength()); + location.setDataFileId(currentWriteFile.getDataFileId().intValue()); + currentWriteFile.incrementLength(location.getSize()); + currentWriteFile.increment(); + return currentWriteFile; + } + + DataFile getDataFile(Location item) throws IOException{ + Integer key=new Integer(item.getDataFileId()); + DataFile dataFile=(DataFile) fileMap.get(key); + if(dataFile==null){ + log.error("Looking for key " + key + " but not found in fileMap: " + fileMap); + throw new IOException("Could not locate data file "+filePrefix+"-"+item.getDataFileId()); + } + return dataFile; + } + + private DataFile getNextDataFile(DataFile dataFile) { + return (DataFile) dataFile.getNext(); + } + + public synchronized void close() throws IOException{ + accessorPool.close(); + storeState(false); + appender.close(); + fileMap.clear(); + controlFile.unlock(); + controlFile.dispose(); + } + + public synchronized boolean delete() throws IOException{ + boolean result=true; + for(Iterator i=fileMap.values().iterator();i.hasNext();){ + DataFile dataFile=(DataFile) i.next(); + result&=dataFile.delete(); + } + fileMap.clear(); + return result; + } + + public synchronized void addInterestInFile(int file) throws IOException{ + if(file>=0){ + Integer key=new Integer(file); + DataFile dataFile=(DataFile) fileMap.get(key); + if(dataFile==null){ + throw new IOException("That data file does not exist"); + } + addInterestInFile(dataFile); + } + } + + synchronized void addInterestInFile(DataFile dataFile){ + if(dataFile!=null){ + dataFile.increment(); + } + } + + public synchronized void removeInterestInFile(int file) throws IOException{ + if(file>=0){ + Integer key=new Integer(file); + DataFile dataFile=(DataFile) fileMap.get(key); + removeInterestInFile(dataFile); + } + } + + synchronized void removeInterestInFile(DataFile dataFile) throws IOException{ + if(dataFile!=null){ + if(dataFile.decrement()<=0){ + if(dataFile!=currentWriteFile){ + removeDataFile(dataFile); + } + } + } + } + + public synchronized void consolidateDataFiles() throws IOException{ + List purgeList=new ArrayList(); + for (DataFile dataFile : fileMap.values()) { + if(dataFile.isUnused() && dataFile != currentWriteFile){ + purgeList.add(dataFile); + } + } + for (DataFile dataFile : purgeList) { + removeDataFile(dataFile); + } + } + + private void removeDataFile(DataFile dataFile) throws IOException{ + fileMap.remove(dataFile.getDataFileId()); + dataFile.unlink(); + boolean result=dataFile.delete(); + log.debug("discarding data file "+dataFile+(result?"successful ":"failed")); + } + + /** + * @return the maxFileLength + */ + public int getMaxFileLength(){ + return maxFileLength; + } + + /** + * @param maxFileLength the maxFileLength to set + */ + public void setMaxFileLength(int maxFileLength){ + this.maxFileLength=maxFileLength; + } + + public String toString(){ + return "DataManager:("+filePrefix+")"; + } + + public synchronized Location getMark() throws IllegalStateException { + return mark; + } + + public Location getNextLocation(Location location) throws IOException, IllegalStateException { + + + Location cur = null; + while( true ) { + if( cur == null ) { + if( location == null ) { + DataFile head = (DataFile) currentWriteFile.getHeadNode(); + cur = new Location(); + cur.setDataFileId(head.getDataFileId()); + cur.setOffset(0); + +// DataFileAccessor reader = accessorPool.openDataFileAccessor(head); +// try { +// if( !reader.readLocationDetailsAndValidate(cur) ) { +// return null; +// } +// } finally { +// accessorPool.closeDataFileAccessor(reader); +// } + } else { + // Set to the next offset.. + cur = new Location(location); + cur.setOffset(cur.getOffset()+cur.getSize()); + } + } else { + cur.setOffset(cur.getOffset()+cur.getSize()); + } + + DataFile dataFile = getDataFile(cur); + + // Did it go into the next file?? + if( dataFile.getLength() <= cur.getOffset() ) { + dataFile = getNextDataFile(dataFile); + if( dataFile == null ) { + return null; + } else { + cur.setDataFileId(dataFile.getDataFileId().intValue()); + cur.setOffset(0); + } + } + + // Load in location size and type. + DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); + try { + reader.readLocationDetails(cur); + } finally { + accessorPool.closeDataFileAccessor(reader); + } + + if( cur.getType() == 0 ) { + return null; + } else if( cur.getType() > 0 ) { + // Only return user records. + return cur; + } + } + } + + public ByteSequence read(Location location) throws IOException, IllegalStateException { + DataFile dataFile = getDataFile(location); + DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); + ByteSequence rc=null; + try { + rc = reader.readRecord(location); + } finally { + accessorPool.closeDataFileAccessor(reader); + } + return rc; + } + + public synchronized void setMark(Location location, boolean sync) throws IOException, IllegalStateException { + mark = location; + storeState(sync); + } + + private void storeState(boolean sync) throws IOException { + ByteSequence state = marshallState(); + appender.storeItem(state, Location.MARK_TYPE, sync); + controlFile.store(state, sync); + } + + public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { + return appender.storeItem(data, Location.USER_TYPE, sync); + } + + public Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException { + return appender.storeItem(data, type, sync); + } + + public void update(Location location, ByteSequence data, boolean sync) throws IOException { + DataFile dataFile = getDataFile(location); + DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); + try { + updater.updateRecord(location, data, sync); + } finally { + accessorPool.closeDataFileAccessor(updater); + } + } + + public File getDirectory() { + return directory; + } + + public void setDirectory(File directory) { + this.directory = directory; + } + + public String getFilePrefix() { + return filePrefix; + } + + public void setFilePrefix(String filePrefix) { + this.filePrefix = filePrefix; + } + + public ConcurrentHashMap getInflightWrites() { + return inflightWrites; + } + + public Location getLastAppendLocation() { + return lastAppendLocation.get(); + } + + public void setLastAppendLocation(Location lastSyncedLocation) { + this.lastAppendLocation.set(lastSyncedLocation); + } + + +} Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java?view=auto&rev=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java Fri Nov 24 22:00:56 2006 @@ -0,0 +1,161 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.async; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileLock; + +import org.apache.activemq.util.ByteSequence; + +/** + * Use to reliably store fixed sized state data. It stores the state in + * record that is versioned and repeated twice in the file so that a failure in the + * middle of the write of the first or second record do not not result in an unknown + * state. + * + * @version $Revision: 1.1 $ + */ +final public class ControlFile { + + private final static boolean DISABLE_FILE_LOCK = "true".equals(System.getProperty("java.nio.channels.FileLock.broken", "false")); + private final File file; + + /** The File that holds the control data. */ + private final RandomAccessFile randomAccessFile; + private final int maxRecordSize; + + private long version=0; + private FileLock lock; + private boolean disposed; + + + public ControlFile(File file, int recordSize) throws IOException { + this.file = file; + this.maxRecordSize = recordSize+4; + randomAccessFile = new RandomAccessFile(file, "rw"); + } + + /** + * Locks the control file. + * @throws IOException + */ + public void lock() throws IOException { + if( DISABLE_FILE_LOCK ) + return; + + if( lock == null ) { + lock = randomAccessFile.getChannel().tryLock(); + if (lock == null) { + throw new IOException("Control file '"+file+"' could not be locked."); + } + } + } + + /** + * Un locks the control file. + * + * @throws IOException + */ + public void unlock() throws IOException { + if( DISABLE_FILE_LOCK ) + return; + + if (lock != null) { + lock.release(); + lock = null; + } + } + + public void dispose() { + if( disposed ) + return; + disposed=true; + try { + unlock(); + } catch (IOException e) { + } + try { + randomAccessFile.close(); + } catch (IOException e) { + } + } + + synchronized public ByteSequence load() throws IOException { + long l = randomAccessFile.length(); + if( l < maxRecordSize ) { + return null; + } + + randomAccessFile.seek(0); + long v1 = randomAccessFile.readLong(); + randomAccessFile.seek(maxRecordSize+8); + long v1check = randomAccessFile.readLong(); + + randomAccessFile.seek(maxRecordSize+16); + long v2 = randomAccessFile.readLong(); + randomAccessFile.seek((maxRecordSize*2)+24); + long v2check = randomAccessFile.readLong(); + + byte[] data=null; + if( v2 == v2check ) { + version = v2; + randomAccessFile.seek(maxRecordSize+24); + int size = randomAccessFile.readInt(); + data = new byte[size]; + randomAccessFile.readFully(data); + } else if ( v1 == v1check ){ + version = v1; + randomAccessFile.seek(maxRecordSize+8); + int size = randomAccessFile.readInt(); + data = new byte[size]; + randomAccessFile.readFully(data); + } else { + // Bummer.. Both checks are screwed. we don't know + // if any of the two buffer are ok. This should + // only happen is data got corrupted. + throw new IOException("Control data corrupted."); + } + return new ByteSequence(data,0,data.length); + } + + public void store(ByteSequence data, boolean sync) throws IOException { + + version++; + randomAccessFile.setLength((maxRecordSize*2)+32); + randomAccessFile.seek(0); + + // Write the first copy of the control data. + randomAccessFile.writeLong(version); + randomAccessFile.writeInt(data.getLength()); + randomAccessFile.write(data.getData()); + randomAccessFile.writeLong(version); + + // Write the second copy of the control data. + randomAccessFile.writeLong(version); + randomAccessFile.writeInt(data.getLength()); + randomAccessFile.write(data.getData()); + randomAccessFile.writeLong(version); + + if( sync ) { + randomAccessFile.getFD().sync(); + } + } + +} Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java?view=auto&rev=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java Fri Nov 24 22:00:56 2006 @@ -0,0 +1,108 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.async; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; + +import org.apache.activemq.util.LinkedNode; +/** + * DataFile + * + * @version $Revision: 1.1.1.1 $ + */ +class DataFile extends LinkedNode implements Comparable { + + private final File file; + private final Integer dataFileId; + private final int preferedSize; + + int length=0; + private int referenceCount; + + DataFile(File file, int number, int preferedSize){ + this.file=file; + this.preferedSize = preferedSize; + this.dataFileId=new Integer(number); + length=(int)(file.exists()?file.length():0); + } + + public Integer getDataFileId(){ + return dataFileId; + } + + public synchronized int getLength(){ + return length; + } + public void setLength(int length) { + this.length=length; + } + public synchronized void incrementLength(int size){ + length+=size; + } + + public synchronized int increment(){ + return ++referenceCount; + } + + public synchronized int decrement(){ + return --referenceCount; + } + + public synchronized boolean isUnused(){ + return referenceCount<=0; + } + + public synchronized String toString(){ + String result = file.getName() + " number = " + dataFileId + " , length = " + length + " refCount = " + referenceCount; + return result; + } + + public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException { + RandomAccessFile rc=new RandomAccessFile(file,"rw"); + // When we start to write files size them up so that the OS has a chance + // to allocate the file contigously. + if( appender ){ + if( length < preferedSize ) { + rc.setLength(preferedSize); + } + } + return rc; + } + + public void closeRandomAccessFile(RandomAccessFile file) throws IOException { + // On close set the file size to the real size. + if( length != file.length() ) { + file.setLength(getLength()); + file.close(); + } + } + + public synchronized boolean delete() throws IOException{ + return file.delete(); + } + + public int compareTo(Object o) { + DataFile df = (DataFile) o; + return dataFileId - df.dataFileId; + } + + + +} Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java?view=auto&rev=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java Fri Nov 24 22:00:56 2006 @@ -0,0 +1,143 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.async; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand; +import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey; +import org.apache.activemq.util.ByteSequence; +/** + * Optimized Store reader and updater. Single threaded and synchronous. Use in conjunction + * with the DataFileAccessorPool of concurrent use. + * + * @version $Revision: 1.1.1.1 $ + */ +final class DataFileAccessor { + + private final DataFile dataFile; + private final ConcurrentHashMap inflightWrites; + private final RandomAccessFile file; + private boolean disposed; + + /** + * Construct a Store reader + * + * @param file + * @throws IOException + */ + public DataFileAccessor(AsyncDataManager dataManager, DataFile dataFile) throws IOException{ + this.dataFile = dataFile; + this.inflightWrites = dataManager.getInflightWrites(); + this.file = dataFile.openRandomAccessFile(false); + } + + public DataFile getDataFile() { + return dataFile; + } + + public void dispose() { + if( disposed ) + return; + disposed=true; + try { + dataFile.closeRandomAccessFile(file); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public ByteSequence readRecord(Location location) throws IOException { + + if( !location.isValid() || location.getSize()==Location.NOT_SET ) + throw new IOException("Invalid location: "+location); + + WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location)); + if( asyncWrite!= null ) { + return asyncWrite.data; + } + + try { + byte[] data=new byte[location.getSize()-AsyncDataManager.ITEM_HEAD_FOOT_SPACE]; + file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE); + file.readFully(data); + return new ByteSequence(data, 0, data.length); + } catch (RuntimeException e) { + throw new IOException("Invalid location: "+location+", : "+e); + } + } + + public void readLocationDetails(Location location) throws IOException { + WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location)); + if( asyncWrite!= null ) { + location.setSize(asyncWrite.location.getSize()); + location.setType(asyncWrite.location.getType()); + } else { + file.seek(location.getOffset()); + location.setSize(file.readInt()); + location.setType(file.readByte()); + } + } + + public boolean readLocationDetailsAndValidate(Location location) { + try { + WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location)); + if( asyncWrite!= null ) { + location.setSize(asyncWrite.location.getSize()); + location.setType(asyncWrite.location.getType()); + } else { + file.seek(location.getOffset()); + location.setSize(file.readInt()); + location.setType(file.readByte()); + + byte data[] = new byte[3]; + file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_OFFSET_TO_SOR); + file.readFully(data); + if( data[0] != AsyncDataManager.ITEM_HEAD_SOR[0] || + data[1] != AsyncDataManager.ITEM_HEAD_SOR[1] || + data[2] != AsyncDataManager.ITEM_HEAD_SOR[2] ) { + return false; + } + file.seek(location.getOffset()+location.getSize()-AsyncDataManager.ITEM_FOOT_SPACE); + file.readFully(data); + if( data[0] != AsyncDataManager.ITEM_HEAD_EOR[0] || + data[1] != AsyncDataManager.ITEM_HEAD_EOR[1] || + data[2] != AsyncDataManager.ITEM_HEAD_EOR[2] ) { + return false; + } + } + } catch (IOException e) { + return false; + } + return true; + } + + public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException { + + file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE); + int size = Math.min(data.getLength(), location.getSize()); + file.write(data.getData(), data.getOffset(), size); + if( sync ) { + file.getFD().sync(); + } + + } + +} Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java?view=auto&rev=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java Fri Nov 24 22:00:56 2006 @@ -0,0 +1,138 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.async; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; + +/** + * Used to pool DataFileAccessors. + * + * @author chirino + */ +public class DataFileAccessorPool { + + private final AsyncDataManager dataManager; + private final HashMap pools = new HashMap(); + private boolean closed=false; + + int MAX_OPEN_READERS_PER_FILE=5; + + class Pool { + private final DataFile file; + private final ArrayList pool = new ArrayList(); + private boolean used; + + public Pool(DataFile file) { + this.file = file; + } + + public DataFileAccessor openDataFileReader() throws IOException { + DataFileAccessor rc=null; + if( pool.isEmpty() ) { + rc = new DataFileAccessor(dataManager, file); + } else { + rc = (DataFileAccessor) pool.remove(pool.size()-1); + } + used=true; + return rc; + } + + public void closeDataFileReader(DataFileAccessor reader) { + used=true; + if(pool.size() >= MAX_OPEN_READERS_PER_FILE ) { + reader.dispose(); + } else { + pool.add(reader); + } + } + + public void clearUsedMark() { + used=false; + } + + public boolean isUsed() { + return used; + } + + public void dispose() { + for (DataFileAccessor reader : pool) { + reader.dispose(); + } + pool.clear(); + } + + } + + public DataFileAccessorPool(AsyncDataManager dataManager){ + this.dataManager=dataManager; + } + + synchronized void clearUsedMark() { + for (Iterator iter = pools.values().iterator(); iter.hasNext();) { + Pool pool = (Pool) iter.next(); + pool.clearUsedMark(); + } + } + + synchronized void disposeUnused() { + for (Iterator iter = pools.values().iterator(); iter.hasNext();) { + Pool pool = iter.next(); + if( !pool.isUsed() ) { + pool.dispose(); + iter.remove(); + } + } + } + + synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException { + if( closed ) { + throw new IOException("Closed."); + } + + Pool pool = pools.get(dataFile.getDataFileId()); + if( pool == null ) { + pool = new Pool(dataFile); + pools.put(dataFile.getDataFileId(), pool); + } + return pool.openDataFileReader(); + } + + synchronized void closeDataFileAccessor(DataFileAccessor reader) { + Pool pool = pools.get(reader.getDataFile().getDataFileId()); + if( pool == null || closed ) { + reader.dispose(); + } else { + pool.closeDataFileReader(reader); + } + } + + synchronized public void close() { + if(closed) + return; + closed=true; + for (Iterator iter = pools.values().iterator(); iter.hasNext();) { + Pool pool = iter.next(); + pool.dispose(); + } + pools.clear(); + } + +} Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java?view=auto&rev=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java Fri Nov 24 22:00:56 2006 @@ -0,0 +1,380 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.async; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.RandomAccessFile; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.DataByteArrayOutputStream; +import org.apache.activemq.util.LinkedNode; + +/** + * An optimized writer to do batch appends to a data file. This object is thread safe + * and gains throughput as you increase the number of concurrent writes it does. + * + * @version $Revision: 1.1.1.1 $ + */ +class DataFileAppender { + + protected static final byte []RESERVED_SPACE= new byte[AsyncDataManager.ITEM_HEAD_RESERVED_SPACE]; + protected static final String SHUTDOWN_COMMAND = "SHUTDOWN"; + int MAX_WRITE_BATCH_SIZE = 1024*1024*4; + + static public class WriteKey { + private final int file; + private final long offset; + private final int hash; + + public WriteKey(Location item){ + file = item.getDataFileId(); + offset = item.getOffset(); + // TODO: see if we can build a better hash + hash = (int) (file ^ offset); + } + + public int hashCode() { + return hash; + } + + public boolean equals(Object obj) { + WriteKey di = (WriteKey)obj; + return di.file == file && di.offset == offset; + } + } + + public class WriteBatch { + + public final DataFile dataFile; + public final WriteCommand first; + public CountDownLatch latch; + public int size; + + public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException { + this.dataFile=dataFile; + this.first=write; + size+=write.location.getSize(); + if( write.sync ) { + latch = new CountDownLatch(1); + } + } + + public boolean canAppend(DataFile dataFile, WriteCommand write) { + if( dataFile != this.dataFile ) + return false; + if( size+write.location.getSize() >= MAX_WRITE_BATCH_SIZE ) + return false; + return true; + } + + public void append(WriteCommand write) throws IOException { + this.first.getTailNode().linkAfter(write); + size+=write.location.getSize(); + if( write.sync && latch==null ) { + latch = new CountDownLatch(1); + } + } + } + + public static class WriteCommand extends LinkedNode { + public final Location location; + public final ByteSequence data; + final boolean sync; + + public WriteCommand(Location location, ByteSequence data, boolean sync) { + this.location = location; + this.data = data; + this.sync = sync; + } + } + + protected final AsyncDataManager dataManager; + + protected final ConcurrentHashMap inflightWrites; + + protected final Object enqueueMutex = new Object(); + protected WriteBatch nextWriteBatch; + + private boolean running; + protected boolean shutdown; + protected IOException firstAsyncException; + protected final CountDownLatch shutdownDone = new CountDownLatch(1); + private Thread thread; + + /** + * Construct a Store writer + * + * @param file + */ + public DataFileAppender(AsyncDataManager dataManager){ + this.dataManager=dataManager; + this.inflightWrites = this.dataManager.getInflightWrites(); + } + + /** + * @param type + * @param marshaller + * @param payload + * @param type + * @param sync + * @return + * @throws IOException + * @throws + * @throws + */ + public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException { + + // Write the packet our internal buffer. + int size = data.getLength()+AsyncDataManager.ITEM_HEAD_FOOT_SPACE; + + final Location location=new Location(); + location.setSize(size); + location.setType(type); + + WriteBatch batch; + WriteCommand write = new WriteCommand(location, data, sync); + + // Locate datafile and enqueue into the executor in sychronized block so that + // writes get equeued onto the executor in order that they were assigned by + // the data manager (which is basically just appending) + + synchronized(this) { + // Find the position where this item will land at. + DataFile dataFile=dataManager.allocateLocation(location); + batch = enqueue(dataFile, write); + } + + if( sync ) { + try { + batch.latch.await(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } else { + inflightWrites.put(new WriteKey(location), write); + } + + return location; + } + + private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException { + synchronized(enqueueMutex) { + WriteBatch rc=null; + if( shutdown ) { + throw new IOException("Async Writter Thread Shutdown"); + } + if( firstAsyncException !=null ) + throw firstAsyncException; + + if( !running ) { + running=true; + thread = new Thread() { + public void run() { + processQueue(); + } + }; + thread.setPriority(Thread.MAX_PRIORITY); + thread.setDaemon(true); + thread.setName("ActiveMQ Data File Writer"); + thread.start(); + } + + if( nextWriteBatch == null ) { + nextWriteBatch = new WriteBatch(dataFile,write); + rc = nextWriteBatch; + enqueueMutex.notify(); + } else { + // Append to current batch if possible.. + if( nextWriteBatch.canAppend(dataFile, write) ) { + nextWriteBatch.append(write); + rc = nextWriteBatch; + } else { + // Otherwise wait for the queuedCommand to be null + try { + while( nextWriteBatch!=null ) { + enqueueMutex.wait(); + } + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + if( shutdown ) { + throw new IOException("Async Writter Thread Shutdown"); + } + + // Start a new batch. + nextWriteBatch = new WriteBatch(dataFile,write); + rc = nextWriteBatch; + enqueueMutex.notify(); + } + } + return rc; + } + } + + public void close() throws IOException { + synchronized( enqueueMutex ) { + if( shutdown == false ) { + shutdown = true; + if( running ) { + enqueueMutex.notifyAll(); + } else { + shutdownDone.countDown(); + } + } + } + + try { + shutdownDone.await(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + + } + + /** + * The async processing loop that writes to the data files and + * does the force calls. + * + * Since the file sync() call is the slowest of all the operations, + * this algorithm tries to 'batch' or group together several file sync() requests + * into a single file sync() call. The batching is accomplished attaching the + * same CountDownLatch instance to every force request in a group. + * + */ + protected void processQueue() { + DataFile dataFile=null; + RandomAccessFile file=null; + try { + + DataByteArrayOutputStream buff = new DataByteArrayOutputStream(MAX_WRITE_BATCH_SIZE); + while( true ) { + + Object o = null; + + // Block till we get a command. + synchronized(enqueueMutex) { + while( true ) { + if( shutdown ) { + o = SHUTDOWN_COMMAND; + break; + } + if( nextWriteBatch!=null ) { + o = nextWriteBatch; + nextWriteBatch=null; + break; + } + enqueueMutex.wait(); + } + enqueueMutex.notify(); + } + + + if( o == SHUTDOWN_COMMAND ) { + break; + } + + WriteBatch wb = (WriteBatch) o; + if( dataFile != wb.dataFile ) { + if( file!=null ) { + dataFile.closeRandomAccessFile(file); + } + dataFile = wb.dataFile; + file = dataFile.openRandomAccessFile(true); + } + + WriteCommand write = wb.first; + + // Write all the data. + // Only need to seek to first location.. all others + // are in sequence. + file.seek(write.location.getOffset()); + + // + // is it just 1 big write? + if( wb.size == write.location.getSize() ) { + + // Just write it directly.. + file.writeInt(write.location.getSize()); + file.writeByte(write.location.getType()); + file.write(RESERVED_SPACE); + file.write(AsyncDataManager.ITEM_HEAD_SOR); + file.write(write.data.getData(),write.data.getOffset(), write.data.getLength()); + file.write(AsyncDataManager.ITEM_HEAD_EOR); + + } else { + + // Combine the smaller writes into 1 big buffer + while( write!=null ) { + + buff.writeInt(write.location.getSize()); + buff.writeByte(write.location.getType()); + buff.write(RESERVED_SPACE); + buff.write(AsyncDataManager.ITEM_HEAD_SOR); + buff.write(write.data.getData(),write.data.getOffset(), write.data.getLength()); + buff.write(AsyncDataManager.ITEM_HEAD_EOR); + + write = (WriteCommand) write.getNext(); + } + + // Now do the 1 big write. + ByteSequence sequence = buff.toByteSequence(); + file.write(sequence.getData(), sequence.getOffset(), sequence.getLength()); + buff.reset(); + } + + file.getFD().sync(); + + WriteCommand lastWrite = (WriteCommand) wb.first.getTailNode(); + dataManager.setLastAppendLocation( lastWrite.location ); + + // Signal any waiting threads that the write is on disk. + if( wb.latch!=null ) { + wb.latch.countDown(); + } + + // Now that the data is on disk, remove the writes from the in flight + // cache. + write = wb.first; + while( write!=null ) { + if( !write.sync ) { + inflightWrites.remove(new WriteKey(write.location)); + } + write = (WriteCommand) write.getNext(); + } + } + + } catch (IOException e) { + synchronized( enqueueMutex ) { + firstAsyncException = e; + } + } catch (InterruptedException e) { + } finally { + try { + if( file!=null ) { + dataFile.closeRandomAccessFile(file); + } + } catch (IOException e) { + } + shutdownDone.countDown(); + } + } + +} Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java?view=auto&rev=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java Fri Nov 24 22:00:56 2006 @@ -0,0 +1,157 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.async; + +import java.io.IOException; + +import org.apache.activemq.kaha.Marshaller; +import org.apache.activemq.kaha.StoreLocation; +import org.apache.activemq.kaha.impl.data.RedoListener; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.DataByteArrayInputStream; +import org.apache.activemq.util.DataByteArrayOutputStream; + +/** + * Provides a Kaha DataManager Facade to the DataManager. + * + * @version $Revision: 1.1.1.1 $ + */ +public final class DataManagerFacade implements org.apache.activemq.kaha.impl.DataManager { + + private static class StoreLocationFacade implements StoreLocation { + private final Location location; + + public StoreLocationFacade(Location location) { + this.location = location; + } + + public int getFile() { + return location.getDataFileId(); + } + + public long getOffset() { + return location.getOffset(); + } + + public int getSize() { + return location.getSize(); + } + + public Location getLocation() { + return location; + } + } + + static private StoreLocation convertToStoreLocation(Location location) { + if(location==null) + return null; + return new StoreLocationFacade(location); + } + + static private Location convertFromStoreLocation(StoreLocation location) { + + if(location==null) + return null; + + if( location.getClass()== StoreLocationFacade.class ) + return ((StoreLocationFacade)location).getLocation(); + + Location l = new Location(); + l.setOffset((int) location.getOffset()); + l.setSize(location.getSize()); + l.setDataFileId(location.getFile()); + return l; + } + + static final private ByteSequence FORCE_COMMAND = new ByteSequence(new byte[]{'F', 'O', 'R', 'C', 'E'}); + + AsyncDataManager dataManager; + private final String name; + private Marshaller redoMarshaller; + + + public DataManagerFacade(AsyncDataManager dataManager, String name) { + this.dataManager=dataManager; + this.name = name; + } + + public Object readItem(Marshaller marshaller, StoreLocation location) throws IOException { + ByteSequence sequence = dataManager.read(convertFromStoreLocation(location)); + DataByteArrayInputStream dataIn = new DataByteArrayInputStream(sequence); + return marshaller.readPayload(dataIn); + } + + + public StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException { + final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream(); + marshaller.writePayload(payload,buffer); + ByteSequence data = buffer.toByteSequence(); + return convertToStoreLocation(dataManager.write(data, (byte)1, false)); + } + + + public void force() throws IOException { + dataManager.write(FORCE_COMMAND, (byte)2, true); + } + + public void updateItem(StoreLocation location, Marshaller marshaller, Object payload) throws IOException { + final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream(); + marshaller.writePayload(payload,buffer); + ByteSequence data = buffer.toByteSequence(); + dataManager.update(convertFromStoreLocation(location), data, false); + } + + public void close() throws IOException { + dataManager.close(); + } + + public void consolidateDataFiles() throws IOException { + dataManager.consolidateDataFiles(); + } + + public boolean delete() throws IOException { + return dataManager.delete(); + } + + public void addInterestInFile(int file) throws IOException { + dataManager.addInterestInFile(file); + } + public void removeInterestInFile(int file) throws IOException { + dataManager.removeInterestInFile(file); + } + + public void recoverRedoItems(RedoListener listener) throws IOException { + throw new RuntimeException("Not Implemented.."); + } + public StoreLocation storeRedoItem(Object payload) throws IOException { + throw new RuntimeException("Not Implemented.."); + } + + public Marshaller getRedoMarshaller() { + return redoMarshaller; + } + public void setRedoMarshaller(Marshaller redoMarshaller) { + this.redoMarshaller = redoMarshaller; + } + + public String getName() { + return name; + } + + +} Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java?view=auto&rev=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java Fri Nov 24 22:00:56 2006 @@ -0,0 +1,108 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.async; + +import java.io.IOException; + +import org.apache.activeio.journal.InvalidRecordLocationException; +import org.apache.activeio.journal.Journal; +import org.apache.activeio.journal.JournalEventListener; +import org.apache.activeio.journal.RecordLocation; +import org.apache.activeio.packet.ByteArrayPacket; +import org.apache.activeio.packet.Packet; +import org.apache.activemq.util.ByteSequence; + +/** + * Provides a Journal Facade to the DataManager. + * + * @version $Revision: 1.1.1.1 $ + */ +public final class JournalFacade implements Journal { + + + public static class RecordLocationFacade implements RecordLocation { + private final Location location; + + public RecordLocationFacade(Location location) { + this.location = location; + } + + public Location getLocation() { + return location; + } + + public int compareTo(Object o) { + RecordLocationFacade rlf = (RecordLocationFacade)o; + int rc = location.compareTo(rlf.location); + return rc; + } + } + + static private RecordLocation convertToRecordLocation(Location location) { + if(location==null) + return null; + return new RecordLocationFacade(location); + } + + static private Location convertFromRecordLocation(RecordLocation location) { + + if(location==null) + return null; + + return ((RecordLocationFacade)location).getLocation(); + } + + AsyncDataManager dataManager; + + public JournalFacade(AsyncDataManager dataManager) { + this.dataManager = dataManager; + } + + public void close() throws IOException { + dataManager.close(); + } + + public RecordLocation getMark() throws IllegalStateException { + return convertToRecordLocation(dataManager.getMark()); + } + + public RecordLocation getNextRecordLocation(RecordLocation location) throws InvalidRecordLocationException, IOException, IllegalStateException { + return convertToRecordLocation(dataManager.getNextLocation(convertFromRecordLocation(location))); + } + + public Packet read(RecordLocation location) throws InvalidRecordLocationException, IOException, IllegalStateException { + ByteSequence rc = dataManager.read(convertFromRecordLocation(location)); + if( rc == null ) + return null; + return new ByteArrayPacket(rc.getData(), rc.getOffset(), rc.getLength()); + } + + public void setJournalEventListener(JournalEventListener listener) throws IllegalStateException { + } + + public void setMark(RecordLocation location, boolean sync) throws InvalidRecordLocationException, IOException, IllegalStateException { + dataManager.setMark(convertFromRecordLocation(location), sync); + } + + public RecordLocation write(Packet packet, boolean sync) throws IOException, IllegalStateException { + org.apache.activeio.packet.ByteSequence data = packet.asByteSequence(); + ByteSequence sequence = new ByteSequence(data.getData(), data.getOffset(), data.getLength()); + return convertToRecordLocation(dataManager.write(sequence, sync)); + } + +} Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java?view=auto&rev=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java Fri Nov 24 22:00:56 2006 @@ -0,0 +1,126 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.async; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Used as a location in the data store. + * + * @version $Revision: 1.2 $ + */ +public final class Location { + + public static final byte MARK_TYPE=-1; + public static final byte USER_TYPE=1; + public static final byte NOT_SET_TYPE=0; + public static final int NOT_SET=-1; + + private int dataFileId=NOT_SET; + private int offset=NOT_SET; + private int size=NOT_SET; + private byte type=NOT_SET_TYPE; + + public Location(){} + + Location(Location item) { + this.dataFileId = item.dataFileId; + this.offset = item.offset; + this.size = item.size; + this.type = item.type; + } + + boolean isValid(){ + return dataFileId != NOT_SET; + } + + /** + * @return the size of the data record including the header. + */ + public int getSize(){ + return size; + } + + /** + * @param size the size of the data record including the header. + */ + public void setSize(int size){ + this.size=size; + } + + /** + * @return the size of the payload of the record. + */ + public int getPaylodSize() { + return size-AsyncDataManager.ITEM_HEAD_FOOT_SPACE; + } + + public int getOffset(){ + return offset; + } + public void setOffset(int offset){ + this.offset=offset; + } + + public int getDataFileId(){ + return dataFileId; + } + + public void setDataFileId(int file){ + this.dataFileId=file; + } + + public byte getType() { + return type; + } + + public void setType(byte type) { + this.type = type; + } + + public String toString(){ + String result="offset = "+offset+", file = " + dataFileId + ", size = "+size + ", type = "+type; + return result; + } + + public int compareTo(Object o) { + Location l = (Location)o; + if( dataFileId == l.dataFileId ) { + int rc = offset-l.offset; + return rc; + } + return dataFileId - l.dataFileId; + } + + public void writeExternal(DataOutput dos) throws IOException { + dos.writeInt(dataFileId); + dos.writeInt(offset); + dos.writeInt(size); + dos.writeByte(type); + } + + public void readExternal(DataInput dis) throws IOException { + dataFileId = dis.readInt(); + offset = dis.readInt(); + size = dis.readInt(); + type = dis.readByte(); + } + +} Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java ------------------------------------------------------------------------------ svn:executable = * Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java?view=auto&rev=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java Fri Nov 24 22:00:56 2006 @@ -0,0 +1,213 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.async; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +/** + * An AsyncDataFileAppender that uses NIO ByteBuffers and File chanels to more efficently + * copy data to files. + * + * @version $Revision: 1.1.1.1 $ + */ +class NIODataFileAppender extends DataFileAppender { + + public NIODataFileAppender(AsyncDataManager fileManager) { + super(fileManager); + } + + /** + * The async processing loop that writes to the data files and + * does the force calls. + * + * Since the file sync() call is the slowest of all the operations, + * this algorithm tries to 'batch' or group together several file sync() requests + * into a single file sync() call. The batching is accomplished attaching the + * same CountDownLatch instance to every force request in a group. + * + */ + protected void processQueue() { + DataFile dataFile=null; + RandomAccessFile file=null; + FileChannel channel=null; + + try { + + ByteBuffer header = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_HEAD_SPACE); + ByteBuffer footer = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_FOOT_SPACE); + ByteBuffer buffer = ByteBuffer.allocateDirect(MAX_WRITE_BATCH_SIZE); + + // Populate the static parts of the headers and footers.. + header.putInt(0); // size + header.put((byte) 0); // type + header.put(RESERVED_SPACE); // reserved + header.put(AsyncDataManager.ITEM_HEAD_SOR); + footer.put(AsyncDataManager.ITEM_HEAD_EOR); + + while( true ) { + + Object o = null; + + // Block till we get a command. + synchronized(enqueueMutex) { + while( true ) { + if( shutdown ) { + o = SHUTDOWN_COMMAND; + break; + } + if( nextWriteBatch!=null ) { + o = nextWriteBatch; + nextWriteBatch=null; + break; + } + enqueueMutex.wait(); + } + enqueueMutex.notify(); + } + + + if( o == SHUTDOWN_COMMAND ) { + break; + } + + WriteBatch wb = (WriteBatch) o; + if( dataFile != wb.dataFile ) { + if( file!=null ) { + dataFile.closeRandomAccessFile(file); + } + dataFile = wb.dataFile; + file = dataFile.openRandomAccessFile(true); + channel = file.getChannel(); + } + + WriteCommand write = wb.first; + + // Write all the data. + // Only need to seek to first location.. all others + // are in sequence. + file.seek(write.location.getOffset()); + + // + // is it just 1 big write? + if( wb.size == write.location.getSize() ) { + + header.clear(); + header.putInt(write.location.getSize()); + header.put(write.location.getType()); + header.clear(); + transfer(header, channel); + ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength()); + transfer(source, channel); + footer.clear(); + transfer(footer, channel); + + } else { + + // Combine the smaller writes into 1 big buffer + while( write!=null ) { + + header.clear(); + header.putInt(write.location.getSize()); + header.put(write.location.getType()); + header.clear(); + copy(header, buffer); + assert !header.hasRemaining(); + + ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength()); + copy(source, buffer); + assert !source.hasRemaining(); + + footer.clear(); + copy(footer, buffer); + assert !footer.hasRemaining(); + + write = (WriteCommand) write.getNext(); + } + + // Fully write out the buffer.. + buffer.flip(); + transfer(buffer, channel); + buffer.clear(); + } + + file.getChannel().force(false); + + WriteCommand lastWrite = (WriteCommand) wb.first.getTailNode(); + dataManager.setLastAppendLocation( lastWrite.location ); + + // Signal any waiting threads that the write is on disk. + if( wb.latch!=null ) { + wb.latch.countDown(); + } + + // Now that the data is on disk, remove the writes from the in flight + // cache. + write = wb.first; + while( write!=null ) { + if( !write.sync ) { + inflightWrites.remove(new WriteKey(write.location)); + } + write = (WriteCommand) write.getNext(); + } + } + + } catch (IOException e) { + synchronized( enqueueMutex ) { + firstAsyncException = e; + } + } catch (InterruptedException e) { + } finally { + try { + if( file!=null ) { + dataFile.closeRandomAccessFile(file); + } + } catch (IOException e) { + } + shutdownDone.countDown(); + } + } + + /** + * Copy the bytes in header to the channel. + * @param header - source of data + * @param channel - destination where the data will be written. + * @throws IOException + */ + private void transfer(ByteBuffer header, FileChannel channel) throws IOException { + while (header.hasRemaining()) { + channel.write(header); + } + } + + private int copy(ByteBuffer src, ByteBuffer dest) { + int rc = Math.min(dest.remaining(), src.remaining()); + if( rc > 0 ) { + // Adjust our limit so that we don't overflow the dest buffer. + int limit = src.limit(); + src.limit(src.position()+rc); + dest.put(src); + // restore the limit. + src.limit(limit); + } + return rc; + } + +} Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java?view=diff&rev=479089&r1=479088&r2=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java Fri Nov 24 22:00:56 2006 @@ -24,7 +24,7 @@ import org.apache.activemq.kaha.IndexTypes; import org.apache.activemq.kaha.RuntimeStoreException; import org.apache.activemq.kaha.StoreEntry; -import org.apache.activemq.kaha.impl.data.DataManager; +import org.apache.activemq.kaha.impl.DataManager; import org.apache.activemq.kaha.impl.data.Item; import org.apache.activemq.kaha.impl.index.DiskIndexLinkedList; import org.apache.activemq.kaha.impl.index.IndexItem; Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java?view=diff&rev=479089&r1=479088&r2=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java Fri Nov 24 22:00:56 2006 @@ -27,7 +27,7 @@ import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.StoreEntry; import org.apache.activemq.kaha.StoreLocation; -import org.apache.activemq.kaha.impl.data.DataManager; +import org.apache.activemq.kaha.impl.DataManager; import org.apache.activemq.kaha.impl.data.Item; import org.apache.activemq.kaha.impl.index.IndexItem; import org.apache.activemq.kaha.impl.index.IndexManager; Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?view=diff&rev=479089&r1=479088&r2=479089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java Fri Nov 24 22:00:56 2006 @@ -32,7 +32,7 @@ import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.StoreEntry; import org.apache.activemq.kaha.StoreLocation; -import org.apache.activemq.kaha.impl.data.DataManager; +import org.apache.activemq.kaha.impl.DataManager; import org.apache.activemq.kaha.impl.data.Item; import org.apache.activemq.kaha.impl.index.IndexItem; import org.apache.activemq.kaha.impl.index.IndexLinkedList;