Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 62948 invoked from network); 18 Jul 2008 15:50:52 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 18 Jul 2008 15:50:52 -0000 Received: (qmail 25418 invoked by uid 500); 18 Jul 2008 15:50:52 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 25396 invoked by uid 500); 18 Jul 2008 15:50:52 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 25318 invoked by uid 99); 18 Jul 2008 15:50:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Jul 2008 08:50:52 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Jul 2008 15:50:03 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 371E32388A39; Fri, 18 Jul 2008 08:49:58 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r677944 [2/11] - in /activemq/sandbox/kahadb: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/kahadb/ src/main/java/org/apache/kahadb/impl/ src/main/java/org/apache/kahadb/impl/async/ s... Date: Fri, 18 Jul 2008 15:49:52 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080718154958.371E32388A39@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/KahaStore.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/KahaStore.java?rev=677944&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/KahaStore.java (added) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/KahaStore.java Fri Jul 18 08:49:48 2008 @@ -0,0 +1,588 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.impl; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileLock; +import java.util.Date; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.kahadb.ContainerId; +import org.apache.kahadb.ListContainer; +import org.apache.kahadb.MapContainer; +import org.apache.kahadb.Store; +import org.apache.kahadb.StoreLocation; +import org.apache.kahadb.impl.async.AsyncDataManager; +import org.apache.kahadb.impl.async.DataManagerFacade; +import org.apache.kahadb.impl.container.ListContainerImpl; +import org.apache.kahadb.impl.container.MapContainerImpl; +import org.apache.kahadb.impl.data.DataManagerImpl; +import org.apache.kahadb.impl.data.Item; +import org.apache.kahadb.impl.data.RedoListener; +import org.apache.kahadb.impl.index.IndexItem; +import org.apache.kahadb.impl.index.IndexManager; +import org.apache.kahadb.impl.index.RedoStoreIndexItem; +import org.apache.kahadb.util.IOHelper; + +/** + * Store Implementation + * + * @version $Revision: 1.1.1.1 $ + */ +public class KahaStore implements Store { + + private static final String PROPERTY_PREFIX = "org.apache.activemq.kaha.Store"; + private static final boolean BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX + + ".FileLockBroken", + "false")); + private static final boolean DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX + + ".DisableLocking", + "false")); + //according to the String javadoc, all constant strings are interned so this will be the same object throughout the vm + //and we can use it as a monitor for the lockset. + private final static String LOCKSET_MONITOR = PROPERTY_PREFIX + ".Lock.Monitor"; + private static final Log LOG = LogFactory.getLog(KahaStore.class); + + private final File directory; + private final String mode; + private IndexRootContainer mapsContainer; + private IndexRootContainer listsContainer; + private Map lists = new ConcurrentHashMap(); + private Map maps = new ConcurrentHashMap(); + private Map dataManagers = new ConcurrentHashMap(); + private Map indexManagers = new ConcurrentHashMap(); + private IndexManager rootIndexManager; // contains all the root indexes + private boolean closed; + private boolean initialized; + private boolean logIndexChanges; + private boolean useAsyncDataManager; + private long maxDataFileLength = 1024 * 1024 * 32; + private FileLock lock; + private boolean persistentIndex = true; + private RandomAccessFile lockFile; + private final AtomicLong storeSize; + private String defaultContainerName = DEFAULT_CONTAINER_NAME; + + + public KahaStore(String name, String mode) throws IOException { + this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, new AtomicLong()); + } + + public KahaStore(File directory, String mode) throws IOException { + this(directory, mode, new AtomicLong()); + } + + public KahaStore(String name, String mode,AtomicLong storeSize) throws IOException { + this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, storeSize); + } + + public KahaStore(File directory, String mode, AtomicLong storeSize) throws IOException { + this.mode = mode; + this.storeSize = storeSize; + this.directory = directory; + IOHelper.mkdirs(this.directory); + } + + public synchronized void close() throws IOException { + if (!closed) { + closed = true; + if (initialized) { + unlock(); + for (ListContainerImpl container : lists.values()) { + container.close(); + } + lists.clear(); + for (MapContainerImpl container : maps.values()) { + container.close(); + } + maps.clear(); + for (Iterator iter = indexManagers.values().iterator(); iter.hasNext();) { + IndexManager im = iter.next(); + im.close(); + iter.remove(); + } + for (Iterator iter = dataManagers.values().iterator(); iter.hasNext();) { + DataManager dm = iter.next(); + dm.close(); + iter.remove(); + } + } + if (lockFile!=null) { + lockFile.close(); + lockFile=null; + } + } + } + + public synchronized void force() throws IOException { + if (initialized) { + for (Iterator iter = indexManagers.values().iterator(); iter.hasNext();) { + IndexManager im = iter.next(); + im.force(); + } + for (Iterator iter = dataManagers.values().iterator(); iter.hasNext();) { + DataManager dm = iter.next(); + dm.force(); + } + } + } + + public synchronized void clear() throws IOException { + initialize(); + for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) { + ContainerId id = (ContainerId)i.next(); + MapContainer container = getMapContainer(id.getKey(), id.getDataContainerName()); + container.clear(); + } + for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) { + ContainerId id = (ContainerId)i.next(); + ListContainer container = getListContainer(id.getKey(), id.getDataContainerName()); + container.clear(); + } + + } + + public synchronized boolean delete() throws IOException { + boolean result = true; + if (initialized) { + clear(); + for (Iterator iter = indexManagers.values().iterator(); iter.hasNext();) { + IndexManager im = iter.next(); + result &= im.delete(); + iter.remove(); + } + for (Iterator iter = dataManagers.values().iterator(); iter.hasNext();) { + DataManager dm = iter.next(); + result &= dm.delete(); + iter.remove(); + } + } + if (directory != null && directory.isDirectory()) { + result =IOHelper.deleteChildren(directory); + String str = result ? "successfully deleted" : "failed to delete"; + LOG.info("Kaha Store " + str + " data directory " + directory); + } + return result; + } + + public synchronized boolean isInitialized() { + return initialized; + } + + public boolean doesMapContainerExist(Object id) throws IOException { + return doesMapContainerExist(id, defaultContainerName); + } + + public synchronized boolean doesMapContainerExist(Object id, String containerName) throws IOException { + initialize(); + ContainerId containerId = new ContainerId(); + containerId.setKey(id); + containerId.setDataContainerName(containerName); + return maps.containsKey(containerId) || mapsContainer.doesRootExist(containerId); + } + + public MapContainer getMapContainer(Object id) throws IOException { + return getMapContainer(id, defaultContainerName); + } + + public MapContainer getMapContainer(Object id, String containerName) throws IOException { + return getMapContainer(id, containerName, persistentIndex); + } + + public synchronized MapContainer getMapContainer(Object id, String originalContainerName, boolean persistentIndex) + throws IOException { + initialize(); + String containerName = IOHelper.toFileSystemSafeName(originalContainerName); + ContainerId containerId = new ContainerId(); + containerId.setKey(id); + containerId.setDataContainerName(containerName); + MapContainerImpl result = maps.get(containerId); + if (result == null) { + DataManager dm = getDataManager(containerName); + IndexManager im = getIndexManager(dm, containerName); + IndexItem root = mapsContainer.getRoot(im, containerId); + if (root == null) { + root = mapsContainer.addRoot(im, containerId); + } + result = new MapContainerImpl(directory, containerId, root, im, dm, persistentIndex); + maps.put(containerId, result); + } + return result; + } + + public void deleteMapContainer(Object id) throws IOException { + deleteMapContainer(id, defaultContainerName); + } + + public void deleteMapContainer(Object id, String containerName) throws IOException { + ContainerId containerId = new ContainerId(id, containerName); + deleteMapContainer(containerId); + } + + public synchronized void deleteMapContainer(ContainerId containerId) throws IOException { + initialize(); + MapContainerImpl container = maps.remove(containerId); + if (container != null) { + container.clear(); + mapsContainer.removeRoot(container.getIndexManager(), containerId); + container.close(); + } + } + + public synchronized Set getMapContainerIds() throws IOException { + initialize(); + Set set = new HashSet(); + for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) { + ContainerId id = (ContainerId)i.next(); + set.add(id); + } + return set; + } + + public boolean doesListContainerExist(Object id) throws IOException { + return doesListContainerExist(id, defaultContainerName); + } + + public synchronized boolean doesListContainerExist(Object id, String containerName) throws IOException { + initialize(); + ContainerId containerId = new ContainerId(); + containerId.setKey(id); + containerId.setDataContainerName(containerName); + return lists.containsKey(containerId) || listsContainer.doesRootExist(containerId); + } + + public ListContainer getListContainer(Object id) throws IOException { + return getListContainer(id, defaultContainerName); + } + + public ListContainer getListContainer(Object id, String containerName) throws IOException { + return getListContainer(id, containerName, persistentIndex); + } + + public synchronized ListContainer getListContainer(Object id, String originalContainerName, + boolean persistentIndex) throws IOException { + initialize(); + String containerName = IOHelper.toFileSystemSafeName(originalContainerName); + ContainerId containerId = new ContainerId(); + containerId.setKey(id); + containerId.setDataContainerName(containerName); + ListContainerImpl result = lists.get(containerId); + if (result == null) { + DataManager dm = getDataManager(containerName); + IndexManager im = getIndexManager(dm, containerName); + + IndexItem root = listsContainer.getRoot(im, containerId); + if (root == null) { + root = listsContainer.addRoot(im, containerId); + } + result = new ListContainerImpl(containerId, root, im, dm, persistentIndex); + lists.put(containerId, result); + } + return result; + } + + public void deleteListContainer(Object id) throws IOException { + deleteListContainer(id, defaultContainerName); + } + + public synchronized void deleteListContainer(Object id, String containerName) throws IOException { + ContainerId containerId = new ContainerId(id, containerName); + deleteListContainer(containerId); + } + + public synchronized void deleteListContainer(ContainerId containerId) throws IOException { + initialize(); + ListContainerImpl container = lists.remove(containerId); + if (container != null) { + listsContainer.removeRoot(container.getIndexManager(), containerId); + container.clear(); + container.close(); + } + } + + public synchronized Set getListContainerIds() throws IOException { + initialize(); + Set set = new HashSet(); + for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) { + ContainerId id = (ContainerId)i.next(); + set.add(id); + } + return set; + } + + /** + * @return the listsContainer + */ + public IndexRootContainer getListsContainer() { + return this.listsContainer; + } + + /** + * @return the mapsContainer + */ + public IndexRootContainer getMapsContainer() { + return this.mapsContainer; + } + + public synchronized DataManager getDataManager(String name) throws IOException { + DataManager dm = dataManagers.get(name); + if (dm == null) { + if (isUseAsyncDataManager()) { + AsyncDataManager t = new AsyncDataManager(storeSize); + t.setDirectory(directory); + t.setFilePrefix("async-data-" + name + "-"); + t.setMaxFileLength((int)maxDataFileLength); + t.start(); + dm = new DataManagerFacade(t, name); + } else { + DataManagerImpl t = new DataManagerImpl(directory, name,storeSize); + t.setMaxFileLength(maxDataFileLength); + dm = t; + } + if (logIndexChanges) { + recover(dm); + } + dataManagers.put(name, dm); + } + return dm; + } + + public synchronized IndexManager getIndexManager(DataManager dm, String name) throws IOException { + IndexManager im = indexManagers.get(name); + if (im == null) { + im = new IndexManager(directory, name, mode, logIndexChanges ? dm : null,storeSize); + indexManagers.put(name, im); + } + return im; + } + + private void recover(final DataManager dm) throws IOException { + dm.recoverRedoItems(new RedoListener() { + public void onRedoItem(StoreLocation item, Object o) throws Exception { + RedoStoreIndexItem redo = (RedoStoreIndexItem)o; + // IndexManager im = getIndexManager(dm, redo.getIndexName()); + IndexManager im = getIndexManager(dm, dm.getName()); + im.redo(redo); + } + }); + } + + public synchronized boolean isLogIndexChanges() { + return logIndexChanges; + } + + public synchronized void setLogIndexChanges(boolean logIndexChanges) { + this.logIndexChanges = logIndexChanges; + } + + /** + * @return the maxDataFileLength + */ + public synchronized long getMaxDataFileLength() { + return maxDataFileLength; + } + + /** + * @param maxDataFileLength the maxDataFileLength to set + */ + public synchronized void setMaxDataFileLength(long maxDataFileLength) { + this.maxDataFileLength = maxDataFileLength; + } + + /** + * @see org.apache.kahadb.IndexTypes + * @return the default index type + */ + public synchronized String getIndexTypeAsString() { + return persistentIndex ? "PERSISTENT" : "VM"; + } + + /** + * Set the default index type + * + * @param type + * @see org.apache.kahadb.IndexTypes + */ + public synchronized void setIndexTypeAsString(String type) { + if (type.equalsIgnoreCase("VM")) { + persistentIndex = false; + } else { + persistentIndex = true; + } + } + + public boolean isPersistentIndex() { + return persistentIndex; + } + + public void setPersistentIndex(boolean persistentIndex) { + this.persistentIndex = persistentIndex; + } + + + public synchronized boolean isUseAsyncDataManager() { + return useAsyncDataManager; + } + + public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) { + this.useAsyncDataManager = useAsyncWriter; + } + + /** + * @return + * @see org.apache.kahadb.Store#size() + */ + public long size(){ + return storeSize.get(); + } + + public String getDefaultContainerName() { + return defaultContainerName; + } + + public void setDefaultContainerName(String defaultContainerName) { + this.defaultContainerName = defaultContainerName; + } + + public synchronized void initialize() throws IOException { + if (closed) { + throw new IOException("Store has been closed."); + } + if (!initialized) { + LOG.info("Kaha Store using data directory " + directory); + lockFile = new RandomAccessFile(new File(directory, "lock"), "rw"); + lock(); + DataManager defaultDM = getDataManager(defaultContainerName); + rootIndexManager = getIndexManager(defaultDM, defaultContainerName); + IndexItem mapRoot = new IndexItem(); + IndexItem listRoot = new IndexItem(); + if (rootIndexManager.isEmpty()) { + mapRoot.setOffset(0); + rootIndexManager.storeIndex(mapRoot); + listRoot.setOffset(IndexItem.INDEX_SIZE); + rootIndexManager.storeIndex(listRoot); + rootIndexManager.setLength(IndexItem.INDEX_SIZE * 2); + } else { + mapRoot = rootIndexManager.getIndex(0); + listRoot = rootIndexManager.getIndex(IndexItem.INDEX_SIZE); + } + initialized = true; + mapsContainer = new IndexRootContainer(mapRoot, rootIndexManager, defaultDM); + listsContainer = new IndexRootContainer(listRoot, rootIndexManager, defaultDM); + /** + * Add interest in data files - then consolidate them + */ + generateInterestInMapDataFiles(); + generateInterestInListDataFiles(); + for (Iterator i = dataManagers.values().iterator(); i.hasNext();) { + DataManager dm = i.next(); + dm.consolidateDataFiles(); + } + } + } + + private void lock() throws IOException { + synchronized (LOCKSET_MONITOR) { + if (!DISABLE_LOCKING && directory != null && lock == null) { + String key = getPropertyKey(); + String property = System.getProperty(key); + if (null == property) { + if (!BROKEN_FILE_LOCK) { + lock = lockFile.getChannel().tryLock(); + if (lock == null) { + throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by another application"); + } else + System.setProperty(key, new Date().toString()); + } + } else { //already locked + throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by this application."); + } + } + } + } + + private void unlock() throws IOException { + synchronized (LOCKSET_MONITOR) { + if (!DISABLE_LOCKING && (null != directory) && (null != lock)) { + System.getProperties().remove(getPropertyKey()); + if (lock.isValid()) { + lock.release(); + } + lock = null; + } + } + } + + + private String getPropertyKey() throws IOException { + return getClass().getName() + ".lock." + directory.getCanonicalPath(); + } + + /** + * scans the directory and builds up the IndexManager and DataManager + * + * @throws IOException + */ + private void generateInterestInListDataFiles() throws IOException { + for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) { + ContainerId id = (ContainerId)i.next(); + DataManager dm = getDataManager(id.getDataContainerName()); + IndexManager im = getIndexManager(dm, id.getDataContainerName()); + IndexItem theRoot = listsContainer.getRoot(im, id); + long nextItem = theRoot.getNextItem(); + while (nextItem != Item.POSITION_NOT_SET) { + IndexItem item = im.getIndex(nextItem); + item.setOffset(nextItem); + dm.addInterestInFile(item.getKeyFile()); + dm.addInterestInFile(item.getValueFile()); + nextItem = item.getNextItem(); + } + } + } + + /** + * scans the directory and builds up the IndexManager and DataManager + * + * @throws IOException + */ + private void generateInterestInMapDataFiles() throws IOException { + for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) { + ContainerId id = (ContainerId)i.next(); + DataManager dm = getDataManager(id.getDataContainerName()); + IndexManager im = getIndexManager(dm, id.getDataContainerName()); + IndexItem theRoot = mapsContainer.getRoot(im, id); + long nextItem = theRoot.getNextItem(); + while (nextItem != Item.POSITION_NOT_SET) { + IndexItem item = im.getIndex(nextItem); + item.setOffset(nextItem); + dm.addInterestInFile(item.getKeyFile()); + dm.addInterestInFile(item.getValueFile()); + nextItem = item.getNextItem(); + } + + } + } +} Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/StoreLockedExcpetion.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/StoreLockedExcpetion.java?rev=677944&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/StoreLockedExcpetion.java (added) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/StoreLockedExcpetion.java Fri Jul 18 08:49:48 2008 @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.impl; + +import java.io.IOException; + +/** + * Exception thrown if the store is in use by another application + * + * @version $Revision: 1.1.1.1 $ + */ +public class StoreLockedExcpetion extends IOException { + + private static final long serialVersionUID = 3857646689671366926L; + + /** + * Default Constructor + */ + public StoreLockedExcpetion() { + } + + /** + * @param s + */ + public StoreLockedExcpetion(String s) { + super(s); + } +} Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/AsyncDataManager.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/AsyncDataManager.java?rev=677944&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/AsyncDataManager.java (added) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/AsyncDataManager.java Fri Jul 18 08:49:48 2008 @@ -0,0 +1,753 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.impl.async; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.kahadb.impl.async.DataFileAppender.WriteCommand; +import org.apache.kahadb.impl.async.DataFileAppender.WriteKey; +import org.apache.kahadb.util.ByteSequence; +import org.apache.kahadb.util.IOHelper; +import org.apache.kahadb.util.Scheduler; + + + +/** + * Manages DataFiles + * + * @version $Revision: 1.1.1.1 $ + */ +public class AsyncDataManager { + + public static final int CONTROL_RECORD_MAX_LENGTH = 1024; + public static final int ITEM_HEAD_RESERVED_SPACE = 21; + // ITEM_HEAD_SPACE = length + type+ reserved space + SOR + public static final int ITEM_HEAD_SPACE = 4 + 1 + ITEM_HEAD_RESERVED_SPACE + 3; + public static final int ITEM_HEAD_OFFSET_TO_SOR = ITEM_HEAD_SPACE - 3; + public static final int ITEM_FOOT_SPACE = 3; // EOR + + public static final int ITEM_HEAD_FOOT_SPACE = ITEM_HEAD_SPACE + ITEM_FOOT_SPACE; + + public static final byte[] ITEM_HEAD_SOR = new byte[] {'S', 'O', 'R'}; // + public static final byte[] ITEM_HEAD_EOR = new byte[] {'E', 'O', 'R'}; // + + public static final byte DATA_ITEM_TYPE = 1; + public static final byte REDO_ITEM_TYPE = 2; + public static final String DEFAULT_DIRECTORY = "data"; + public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; + public static final String DEFAULT_FILE_PREFIX = "data-"; + public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; + public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; + public static final int PREFERED_DIFF = 1024 * 512; + + private static final Log LOG = LogFactory.getLog(AsyncDataManager.class); + + protected final Map inflightWrites = new ConcurrentHashMap(); + + protected File directory = new File(DEFAULT_DIRECTORY); + protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY); + protected String filePrefix = DEFAULT_FILE_PREFIX; + protected ControlFile controlFile; + protected boolean started; + protected boolean useNio = true; + + protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; + protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF; + + protected DataFileAppender appender; + protected DataFileAccessorPool accessorPool = new DataFileAccessorPool(this); + + protected Map fileMap = new HashMap(); + protected Map fileByFileMap = new LinkedHashMap(); + protected DataFile currentWriteFile; + + protected Location mark; + protected final AtomicReference lastAppendLocation = new AtomicReference(); + protected Runnable cleanupTask; + protected final AtomicLong storeSize; + protected boolean archiveDataLogs; + + public AsyncDataManager(AtomicLong storeSize) { + this.storeSize=storeSize; + } + + public AsyncDataManager() { + this(new AtomicLong()); + } + + @SuppressWarnings("unchecked") + public synchronized void start() throws IOException { + if (started) { + return; + } + + started = true; + preferedFileLength=Math.max(PREFERED_DIFF, getMaxFileLength()-PREFERED_DIFF); + lock(); + + ByteSequence sequence = controlFile.load(); + if (sequence != null && sequence.getLength() > 0) { + unmarshallState(sequence); + } + if (useNio) { + appender = new NIODataFileAppender(this); + } else { + appender = new DataFileAppender(this); + } + + File[] files = directory.listFiles(new FilenameFilter() { + public boolean accept(File dir, String n) { + return dir.equals(directory) && n.startsWith(filePrefix); + } + }); + + if (files != null) { + for (int i = 0; i < files.length; i++) { + try { + File file = files[i]; + String n = file.getName(); + String numStr = n.substring(filePrefix.length(), n.length()); + int num = Integer.parseInt(numStr); + DataFile dataFile = new DataFile(file, num, preferedFileLength); + fileMap.put(dataFile.getDataFileId(), dataFile); + storeSize.addAndGet(dataFile.getLength()); + } catch (NumberFormatException e) { + // Ignore file that do not match the pattern. + } + } + + // Sort the list so that we can link the DataFiles together in the + // right order. + List l = new ArrayList(fileMap.values()); + Collections.sort(l); + currentWriteFile = null; + for (DataFile df : l) { + if (currentWriteFile != null) { + currentWriteFile.linkAfter(df); + } + currentWriteFile = df; + fileByFileMap.put(df.getFile(), df); + } + } + + // Need to check the current Write File to see if there was a partial + // write to it. + if (currentWriteFile != null) { + + // See if the lastSyncedLocation is valid.. + Location l = lastAppendLocation.get(); + if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) { + l = null; + } + + // If we know the last location that was ok.. then we can skip lots + // of checking + try{ + l = recoveryCheck(currentWriteFile, l); + lastAppendLocation.set(l); + }catch(IOException e){ + LOG.warn("recovery check failed", e); + } + } + + storeState(false); + + cleanupTask = new Runnable() { + public void run() { + cleanup(); + } + }; + Scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL); + } + + public void lock() throws IOException { + synchronized (this) { + if (controlFile == null) { + IOHelper.mkdirs(directory); + controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH); + } + controlFile.lock(); + } + } + + protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException { + if (location == null) { + location = new Location(); + location.setDataFileId(dataFile.getDataFileId()); + location.setOffset(0); + } + DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); + try { + reader.readLocationDetails(location); + while (reader.readLocationDetailsAndValidate(location)) { + location.setOffset(location.getOffset() + location.getSize()); + } + } finally { + accessorPool.closeDataFileAccessor(reader); + } + dataFile.setLength(location.getOffset()); + return location; + } + + protected void unmarshallState(ByteSequence sequence) throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength()); + DataInputStream dis = new DataInputStream(bais); + if (dis.readBoolean()) { + mark = new Location(); + mark.readExternal(dis); + } else { + mark = null; + } + if (dis.readBoolean()) { + Location l = new Location(); + l.readExternal(dis); + lastAppendLocation.set(l); + } else { + lastAppendLocation.set(null); + } + } + + private synchronized ByteSequence marshallState() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + + if (mark != null) { + dos.writeBoolean(true); + mark.writeExternal(dos); + } else { + dos.writeBoolean(false); + } + Location l = lastAppendLocation.get(); + if (l != null) { + dos.writeBoolean(true); + l.writeExternal(dos); + } else { + dos.writeBoolean(false); + } + + byte[] bs = baos.toByteArray(); + return new ByteSequence(bs, 0, bs.length); + } + + synchronized DataFile allocateLocation(Location location) throws IOException { + if (currentWriteFile == null || ((currentWriteFile.getLength() + location.getSize()) > maxFileLength)) { + int nextNum = currentWriteFile != null ? currentWriteFile.getDataFileId().intValue() + 1 : 1; + + String fileName = filePrefix + nextNum; + File file = new File(directory, fileName); + DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength); + //actually allocate the disk space + nextWriteFile.closeRandomAccessFile(nextWriteFile.openRandomAccessFile(true)); + fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile); + fileByFileMap.put(file, nextWriteFile); + if (currentWriteFile != null) { + currentWriteFile.linkAfter(nextWriteFile); + if (currentWriteFile.isUnused()) { + removeDataFile(currentWriteFile); + } + } + currentWriteFile = nextWriteFile; + + } + location.setOffset(currentWriteFile.getLength()); + location.setDataFileId(currentWriteFile.getDataFileId().intValue()); + int size = location.getSize(); + currentWriteFile.incrementLength(size); + currentWriteFile.increment(); + storeSize.addAndGet(size); + return currentWriteFile; + } + + public synchronized void removeLocation(Location location) throws IOException{ + + DataFile dataFile = getDataFile(location); + dataFile.decrement(); + } + + synchronized DataFile getDataFile(Location item) throws IOException { + Integer key = Integer.valueOf(item.getDataFileId()); + DataFile dataFile = fileMap.get(key); + if (dataFile == null) { + LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); + throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId()); + } + return dataFile; + } + + synchronized File getFile(Location item) throws IOException { + Integer key = Integer.valueOf(item.getDataFileId()); + DataFile dataFile = fileMap.get(key); + if (dataFile == null) { + LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); + throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId()); + } + return dataFile.getFile(); + } + + private DataFile getNextDataFile(DataFile dataFile) { + return (DataFile)dataFile.getNext(); + } + + public synchronized void close() throws IOException { + if (!started) { + return; + } + Scheduler.cancel(cleanupTask); + accessorPool.close(); + storeState(false); + appender.close(); + fileMap.clear(); + fileByFileMap.clear(); + controlFile.unlock(); + controlFile.dispose(); + started = false; + } + + synchronized void cleanup() { + if (accessorPool != null) { + accessorPool.disposeUnused(); + } + } + + public synchronized boolean delete() throws IOException { + + // Close all open file handles... + appender.close(); + accessorPool.close(); + + boolean result = true; + for (Iterator i = fileMap.values().iterator(); i.hasNext();) { + DataFile dataFile = (DataFile)i.next(); + storeSize.addAndGet(-dataFile.getLength()); + result &= dataFile.delete(); + } + fileMap.clear(); + fileByFileMap.clear(); + lastAppendLocation.set(null); + mark = null; + currentWriteFile = null; + + // reopen open file handles... + accessorPool = new DataFileAccessorPool(this); + if (useNio) { + appender = new NIODataFileAppender(this); + } else { + appender = new DataFileAppender(this); + } + return result; + } + + public synchronized void addInterestInFile(int file) throws IOException { + if (file >= 0) { + Integer key = Integer.valueOf(file); + DataFile dataFile = (DataFile)fileMap.get(key); + if (dataFile == null) { + throw new IOException("That data file does not exist"); + } + addInterestInFile(dataFile); + } + } + + synchronized void addInterestInFile(DataFile dataFile) { + if (dataFile != null) { + dataFile.increment(); + } + } + + public synchronized void removeInterestInFile(int file) throws IOException { + if (file >= 0) { + Integer key = Integer.valueOf(file); + DataFile dataFile = (DataFile)fileMap.get(key); + removeInterestInFile(dataFile); + } + + } + + synchronized void removeInterestInFile(DataFile dataFile) throws IOException { + if (dataFile != null) { + if (dataFile.decrement() <= 0) { + removeDataFile(dataFile); + } + } + } + + public synchronized void consolidateDataFilesNotIn(Set inUse, SetinProgress) throws IOException { + Set unUsed = new HashSet(fileMap.keySet()); + unUsed.removeAll(inUse); + unUsed.removeAll(inProgress); + + List purgeList = new ArrayList(); + for (Integer key : unUsed) { + DataFile dataFile = (DataFile)fileMap.get(key); + purgeList.add(dataFile); + } + for (DataFile dataFile : purgeList) { + if (dataFile.getDataFileId() != currentWriteFile.getDataFileId()) { + forceRemoveDataFile(dataFile); + } + } + } + + public synchronized void consolidateDataFilesNotIn(Set inUse, Integer lastFile) throws IOException { + Set unUsed = new HashSet(fileMap.keySet()); + unUsed.removeAll(inUse); + + List purgeList = new ArrayList(); + for (Integer key : unUsed) { + // Only add files less than the lastFile.. + if( key.intValue() < lastFile.intValue() ) { + DataFile dataFile = (DataFile)fileMap.get(key); + purgeList.add(dataFile); + } + } + for (DataFile dataFile : purgeList) { + forceRemoveDataFile(dataFile); + } + } + + public synchronized void consolidateDataFiles() throws IOException { + List purgeList = new ArrayList(); + for (DataFile dataFile : fileMap.values()) { + if (dataFile.isUnused()) { + purgeList.add(dataFile); + } + } + for (DataFile dataFile : purgeList) { + removeDataFile(dataFile); + } + } + + private synchronized void removeDataFile(DataFile dataFile) throws IOException { + + // Make sure we don't delete too much data. + if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) { + LOG.debug("Won't remove DataFile" + dataFile); + return; + } + forceRemoveDataFile(dataFile); + } + + private synchronized void forceRemoveDataFile(DataFile dataFile) + throws IOException { + accessorPool.disposeDataFileAccessors(dataFile); + fileByFileMap.remove(dataFile.getFile()); + DataFile removed = fileMap.remove(dataFile.getDataFileId()); + storeSize.addAndGet(-dataFile.getLength()); + dataFile.unlink(); + if (archiveDataLogs) { + dataFile.move(getDirectoryArchive()); + LOG.info("moved data file " + dataFile + " to " + + getDirectoryArchive()); + } else { + boolean result = dataFile.delete(); + LOG.info("discarding data file " + dataFile + + (result ? "successful " : "failed")); + } + } + + /** + * @return the maxFileLength + */ + public int getMaxFileLength() { + return maxFileLength; + } + + /** + * @param maxFileLength the maxFileLength to set + */ + public void setMaxFileLength(int maxFileLength) { + this.maxFileLength = maxFileLength; + } + + public String toString() { + return "DataManager:(" + filePrefix + ")"; + } + + public synchronized Location getMark() throws IllegalStateException { + return mark; + } + + public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException { + + Location cur = null; + while (true) { + if (cur == null) { + if (location == null) { + DataFile head = (DataFile)currentWriteFile.getHeadNode(); + cur = new Location(); + cur.setDataFileId(head.getDataFileId()); + cur.setOffset(0); + } else { + // Set to the next offset.. + if( location.getSize() == -1 ) { + cur = new Location(location); + } else { + cur = new Location(location); + cur.setOffset(location.getOffset()+location.getSize()); + } + } + } else { + cur.setOffset(cur.getOffset() + cur.getSize()); + } + + DataFile dataFile = getDataFile(cur); + + // Did it go into the next file?? + if (dataFile.getLength() <= cur.getOffset()) { + dataFile = getNextDataFile(dataFile); + if (dataFile == null) { + return null; + } else { + cur.setDataFileId(dataFile.getDataFileId().intValue()); + cur.setOffset(0); + } + } + + // Load in location size and type. + DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); + try { + reader.readLocationDetails(cur); + } finally { + accessorPool.closeDataFileAccessor(reader); + } + + if (cur.getType() == 0) { + return null; + } else if (cur.getType() > 0) { + // Only return user records. + return cur; + } + } + } + + public synchronized Location getNextLocation(File file, Location lastLocation,boolean thisFileOnly) throws IllegalStateException, IOException{ + DataFile df = fileByFileMap.get(file); + return getNextLocation(df, lastLocation,thisFileOnly); + } + + public synchronized Location getNextLocation(DataFile dataFile, + Location lastLocation,boolean thisFileOnly) throws IOException, IllegalStateException { + + Location cur = null; + while (true) { + if (cur == null) { + if (lastLocation == null) { + DataFile head = (DataFile)dataFile.getHeadNode(); + cur = new Location(); + cur.setDataFileId(head.getDataFileId()); + cur.setOffset(0); + } else { + // Set to the next offset.. + cur = new Location(lastLocation); + cur.setOffset(cur.getOffset() + cur.getSize()); + } + } else { + cur.setOffset(cur.getOffset() + cur.getSize()); + } + + + // Did it go into the next file?? + if (dataFile.getLength() <= cur.getOffset()) { + if (thisFileOnly) { + return null; + }else { + dataFile = getNextDataFile(dataFile); + if (dataFile == null) { + return null; + } else { + cur.setDataFileId(dataFile.getDataFileId().intValue()); + cur.setOffset(0); + } + } + } + + // Load in location size and type. + DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); + try { + reader.readLocationDetails(cur); + } finally { + accessorPool.closeDataFileAccessor(reader); + } + + if (cur.getType() == 0) { + return null; + } else if (cur.getType() > 0) { + // Only return user records. + return cur; + } + } + } + + public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException { + DataFile dataFile = getDataFile(location); + DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); + ByteSequence rc = null; + try { + rc = reader.readRecord(location); + } finally { + accessorPool.closeDataFileAccessor(reader); + } + return rc; + } + + public void setMark(Location location, boolean sync) throws IOException, IllegalStateException { + synchronized (this) { + mark = location; + } + storeState(sync); + } + + protected synchronized void storeState(boolean sync) throws IOException { + ByteSequence state = marshallState(); + appender.storeItem(state, Location.MARK_TYPE, sync); + controlFile.store(state, sync); + } + + public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { + Location loc = appender.storeItem(data, Location.USER_TYPE, sync); + return loc; + } + + public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException { + Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete); + return loc; + } + + public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException { + return appender.storeItem(data, type, sync); + } + + public void update(Location location, ByteSequence data, boolean sync) throws IOException { + DataFile dataFile = getDataFile(location); + DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); + try { + updater.updateRecord(location, data, sync); + } finally { + accessorPool.closeDataFileAccessor(updater); + } + } + + public File getDirectory() { + return directory; + } + + public void setDirectory(File directory) { + this.directory = directory; + } + + public String getFilePrefix() { + return filePrefix; + } + + public void setFilePrefix(String filePrefix) { + this.filePrefix = filePrefix; + } + + public Map getInflightWrites() { + return inflightWrites; + } + + public Location getLastAppendLocation() { + return lastAppendLocation.get(); + } + + public void setLastAppendLocation(Location lastSyncedLocation) { + this.lastAppendLocation.set(lastSyncedLocation); + } + + public boolean isUseNio() { + return useNio; + } + + public void setUseNio(boolean useNio) { + this.useNio = useNio; + } + + public File getDirectoryArchive() { + return directoryArchive; + } + + public void setDirectoryArchive(File directoryArchive) { + this.directoryArchive = directoryArchive; + } + + public boolean isArchiveDataLogs() { + return archiveDataLogs; + } + + public void setArchiveDataLogs(boolean archiveDataLogs) { + this.archiveDataLogs = archiveDataLogs; + } + + synchronized public Integer getCurrentDataFileId() { + if( currentWriteFile==null ) + return null; + return currentWriteFile.getDataFileId(); + } + + /** + * Get a set of files - only valid after start() + * @return files currently being used + */ + public Set getFiles(){ + return fileByFileMap.keySet(); + } + + synchronized public long getDiskSize() { + long rc=0; + DataFile cur = (DataFile)currentWriteFile.getHeadNode(); + while( cur !=null ) { + rc += cur.getLength(); + cur = (DataFile) cur.getNext(); + } + return rc; + } + + synchronized public long getDiskSizeUntil(Location startPosition) { + long rc=0; + DataFile cur = (DataFile)currentWriteFile.getHeadNode(); + while( cur !=null ) { + if( cur.getDataFileId().intValue() >= startPosition.getDataFileId() ) { + return rc + startPosition.getOffset(); + } + rc += cur.getLength(); + cur = (DataFile) cur.getNext(); + } + return rc; + } + +} Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/ControlFile.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/ControlFile.java?rev=677944&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/ControlFile.java (added) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/ControlFile.java Fri Jul 18 08:49:48 2008 @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.impl.async; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; + +import org.apache.kahadb.util.ByteSequence; +import org.apache.kahadb.util.IOExceptionSupport; + +/** + * Use to reliably store fixed sized state data. It stores the state in record + * that is versioned and repeated twice in the file so that a failure in the + * middle of the write of the first or second record do not not result in an + * unknown state. + * + * @version $Revision: 1.1 $ + */ +public final class ControlFile { + + private static final boolean DISABLE_FILE_LOCK = "true".equals(System.getProperty("java.nio.channels.FileLock.broken", "false")); + private final File file; + + /** The File that holds the control data. */ + private final RandomAccessFile randomAccessFile; + private final int maxRecordSize; + private final int firstRecordStart; + private final int secondRecordStart; + private final int firstRecordEnd; + private final int secondRecordEnd; + + private long version; + private FileLock lock; + private boolean disposed; + + public ControlFile(File file, int recordSize) throws IOException { + this.file = file; + this.maxRecordSize = recordSize + 4; + + // Calculate where the records start and end. + this.firstRecordStart = 8; + this.secondRecordStart = 8 + maxRecordSize + 8 + 8; + this.firstRecordEnd = firstRecordStart+maxRecordSize; + this.secondRecordEnd = secondRecordStart+maxRecordSize; + + randomAccessFile = new RandomAccessFile(file, "rw"); + } + + /** + * Locks the control file. + * + * @throws IOException + */ + public void lock() throws IOException { + if (DISABLE_FILE_LOCK) { + return; + } + + if (lock == null) { + try { + lock = randomAccessFile.getChannel().tryLock(); + } catch (OverlappingFileLockException e) { + throw IOExceptionSupport.create("Control file '" + file + "' could not be locked.",e); + } + if (lock == null) { + throw new IOException("Control file '" + file + "' could not be locked."); + } + } + } + + /** + * Un locks the control file. + * + * @throws IOException + */ + public void unlock() throws IOException { + if (DISABLE_FILE_LOCK) { + return; + } + + if (lock != null) { + lock.release(); + lock = null; + } + } + + public void dispose() { + if (disposed) { + return; + } + disposed = true; + try { + unlock(); + } catch (IOException ignore) { + } + try { + randomAccessFile.close(); + } catch (IOException ignore) { + } + } + + public synchronized ByteSequence load() throws IOException { + long l = randomAccessFile.length(); + if (l < maxRecordSize) { + return null; + } + + randomAccessFile.seek(firstRecordStart-8); + long v1 = randomAccessFile.readLong(); + randomAccessFile.seek(firstRecordEnd); + long v1check = randomAccessFile.readLong(); + + randomAccessFile.seek(secondRecordStart - 8); + long v2 = randomAccessFile.readLong(); + randomAccessFile.seek(secondRecordEnd); + long v2check = randomAccessFile.readLong(); + + byte[] data = null; + if (v2 == v2check) { + version = v2; + randomAccessFile.seek(secondRecordStart); + int size = randomAccessFile.readInt(); + data = new byte[size]; + randomAccessFile.readFully(data); + } else if (v1 == v1check) { + version = v1; + randomAccessFile.seek(firstRecordStart); + int size = randomAccessFile.readInt(); + data = new byte[size]; + randomAccessFile.readFully(data); + } else { + // Bummer.. Both checks are screwed. we don't know + // if any of the two buffer are ok. This should + // only happen is data got corrupted. + throw new IOException("Control data corrupted."); + } + return new ByteSequence(data, 0, data.length); + } + + public void store(ByteSequence data, boolean sync) throws IOException { + + version++; + randomAccessFile.setLength((maxRecordSize * 2) + 32); + randomAccessFile.seek(0); + + // Write the first copy of the control data. + randomAccessFile.writeLong(version); + randomAccessFile.writeInt(data.getLength()); + randomAccessFile.write(data.getData()); + randomAccessFile.seek(firstRecordEnd); + randomAccessFile.writeLong(version); + + // Write the second copy of the control data. + randomAccessFile.writeLong(version); + randomAccessFile.writeInt(data.getLength()); + randomAccessFile.write(data.getData()); + randomAccessFile.seek(secondRecordEnd); + randomAccessFile.writeLong(version); + + if (sync) { + randomAccessFile.getFD().sync(); + } + } + +} Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFile.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFile.java?rev=677944&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFile.java (added) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFile.java Fri Jul 18 08:49:48 2008 @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.impl.async; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; + +import org.apache.kahadb.util.IOHelper; +import org.apache.kahadb.util.LinkedNode; + +/** + * DataFile + * + * @version $Revision: 1.1.1.1 $ + */ +public class DataFile extends LinkedNode implements Comparable { + + protected final File file; + protected final Integer dataFileId; + protected final int preferedSize; + + protected int length; + protected int referenceCount; + + DataFile(File file, int number, int preferedSize) { + this.file = file; + this.preferedSize = preferedSize; + this.dataFileId = Integer.valueOf(number); + length = (int)(file.exists() ? file.length() : 0); + } + + File getFile() { + return file; + } + + public Integer getDataFileId() { + return dataFileId; + } + + public synchronized int getLength() { + return length; + } + + public void setLength(int length) { + this.length = length; + } + + public synchronized void incrementLength(int size) { + length += size; + } + + public synchronized int increment() { + return ++referenceCount; + } + + public synchronized int decrement() { + return --referenceCount; + } + + public synchronized int getReferenceCount(){ + return referenceCount; + } + + public synchronized boolean isUnused() { + return referenceCount <= 0; + } + + public synchronized String toString() { + String result = file.getName() + " number = " + dataFileId + " , length = " + length + " refCount = " + referenceCount; + return result; + } + + public synchronized RandomAccessFile openRandomAccessFile(boolean appender) throws IOException { + RandomAccessFile rc = new RandomAccessFile(file, "rw"); + // When we start to write files size them up so that the OS has a chance + // to allocate the file contigously. + if (appender) { + if (length < preferedSize) { + rc.setLength(preferedSize); + } + } + return rc; + } + + public synchronized void closeRandomAccessFile(RandomAccessFile file) throws IOException { + // On close set the file size to the real size. + if (length != file.length()) { + file.setLength(getLength()); + } + file.close(); + } + + public synchronized boolean delete() throws IOException { + return file.delete(); + } + + public synchronized void move(File targetDirectory) throws IOException{ + IOHelper.moveFile(file,targetDirectory); + } + + public int compareTo(DataFile df) { + return dataFileId - df.dataFileId; + } + + @Override + public boolean equals(Object o) { + boolean result = false; + if (o instanceof DataFile) { + result = compareTo((DataFile)o) == 0; + } + return result; + } + + @Override + public int hashCode() { + return dataFileId; + } +} Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAccessor.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAccessor.java?rev=677944&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAccessor.java (added) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAccessor.java Fri Jul 18 08:49:48 2008 @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.impl.async; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Map; + +import org.apache.kahadb.impl.async.DataFileAppender.WriteCommand; +import org.apache.kahadb.impl.async.DataFileAppender.WriteKey; +import org.apache.kahadb.util.ByteSequence; + +/** + * Optimized Store reader and updater. Single threaded and synchronous. Use in + * conjunction with the DataFileAccessorPool of concurrent use. + * + * @version $Revision: 1.1.1.1 $ + */ +final class DataFileAccessor { + + private final DataFile dataFile; + private final Map inflightWrites; + private final RandomAccessFile file; + private boolean disposed; + + /** + * Construct a Store reader + * + * @param fileId + * @throws IOException + */ + public DataFileAccessor(AsyncDataManager dataManager, DataFile dataFile) throws IOException { + this.dataFile = dataFile; + this.inflightWrites = dataManager.getInflightWrites(); + this.file = dataFile.openRandomAccessFile(false); + } + + public DataFile getDataFile() { + return dataFile; + } + + public void dispose() { + if (disposed) { + return; + } + disposed = true; + try { + dataFile.closeRandomAccessFile(file); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public ByteSequence readRecord(Location location) throws IOException { + + if (!location.isValid()) { + throw new IOException("Invalid location: " + location); + } + + WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location)); + if (asyncWrite != null) { + return asyncWrite.data; + } + + try { + + if (location.getSize() == Location.NOT_SET) { + file.seek(location.getOffset()); + location.setSize(file.readInt()); + file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE); + } else { + file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE); + } + + byte[] data = new byte[location.getSize() - AsyncDataManager.ITEM_HEAD_FOOT_SPACE]; + file.readFully(data); + return new ByteSequence(data, 0, data.length); + + } catch (RuntimeException e) { + throw new IOException("Invalid location: " + location + ", : " + e); + } + } + + public void readLocationDetails(Location location) throws IOException { + WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location)); + if (asyncWrite != null) { + location.setSize(asyncWrite.location.getSize()); + location.setType(asyncWrite.location.getType()); + } else { + file.seek(location.getOffset()); + location.setSize(file.readInt()); + location.setType(file.readByte()); + } + } + + public boolean readLocationDetailsAndValidate(Location location) { + try { + WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location)); + if (asyncWrite != null) { + location.setSize(asyncWrite.location.getSize()); + location.setType(asyncWrite.location.getType()); + } else { + file.seek(location.getOffset()); + location.setSize(file.readInt()); + location.setType(file.readByte()); + + byte data[] = new byte[3]; + file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_OFFSET_TO_SOR); + file.readFully(data); + if (data[0] != AsyncDataManager.ITEM_HEAD_SOR[0] + || data[1] != AsyncDataManager.ITEM_HEAD_SOR[1] + || data[2] != AsyncDataManager.ITEM_HEAD_SOR[2]) { + return false; + } + file.seek(location.getOffset() + location.getSize() - AsyncDataManager.ITEM_FOOT_SPACE); + file.readFully(data); + if (data[0] != AsyncDataManager.ITEM_HEAD_EOR[0] + || data[1] != AsyncDataManager.ITEM_HEAD_EOR[1] + || data[2] != AsyncDataManager.ITEM_HEAD_EOR[2]) { + return false; + } + } + } catch (IOException e) { + return false; + } + return true; + } + + public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException { + + file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE); + int size = Math.min(data.getLength(), location.getSize()); + file.write(data.getData(), data.getOffset(), size); + if (sync) { + file.getFD().sync(); + } + + } + +} Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAccessorPool.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAccessorPool.java?rev=677944&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAccessorPool.java (added) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAccessorPool.java Fri Jul 18 08:49:48 2008 @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.impl.async; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Used to pool DataFileAccessors. + * + * @author chirino + */ +public class DataFileAccessorPool { + + private final AsyncDataManager dataManager; + private final Map pools = new HashMap(); + private boolean closed; + private int maxOpenReadersPerFile = 5; + + class Pool { + + private final DataFile file; + private final List pool = new ArrayList(); + private boolean used; + private int openCounter; + private boolean disposed; + + public Pool(DataFile file) { + this.file = file; + } + + public DataFileAccessor openDataFileReader() throws IOException { + DataFileAccessor rc = null; + if (pool.isEmpty()) { + rc = new DataFileAccessor(dataManager, file); + } else { + rc = (DataFileAccessor)pool.remove(pool.size() - 1); + } + used = true; + openCounter++; + return rc; + } + + public synchronized void closeDataFileReader(DataFileAccessor reader) { + openCounter--; + if (pool.size() >= maxOpenReadersPerFile || disposed) { + reader.dispose(); + } else { + pool.add(reader); + } + } + + public synchronized void clearUsedMark() { + used = false; + } + + public synchronized boolean isUsed() { + return used; + } + + public synchronized void dispose() { + for (DataFileAccessor reader : pool) { + reader.dispose(); + } + pool.clear(); + disposed = true; + } + + public synchronized int getOpenCounter() { + return openCounter; + } + + } + + public DataFileAccessorPool(AsyncDataManager dataManager) { + this.dataManager = dataManager; + } + + synchronized void clearUsedMark() { + for (Iterator iter = pools.values().iterator(); iter.hasNext();) { + Pool pool = (Pool)iter.next(); + pool.clearUsedMark(); + } + } + + synchronized void disposeUnused() { + for (Iterator iter = pools.values().iterator(); iter.hasNext();) { + Pool pool = iter.next(); + if (!pool.isUsed()) { + pool.dispose(); + iter.remove(); + } + } + } + + synchronized void disposeDataFileAccessors(DataFile dataFile) { + if (closed) { + throw new IllegalStateException("Closed."); + } + Pool pool = pools.get(dataFile.getDataFileId()); + if (pool != null) { + if (pool.getOpenCounter() == 0) { + pool.dispose(); + pools.remove(dataFile.getDataFileId()); + } else { + throw new IllegalStateException("The data file is still in use: " + dataFile + ", use count: " + pool.getOpenCounter()); + } + } + } + + synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException { + if (closed) { + throw new IOException("Closed."); + } + + Pool pool = pools.get(dataFile.getDataFileId()); + if (pool == null) { + pool = new Pool(dataFile); + pools.put(dataFile.getDataFileId(), pool); + } + return pool.openDataFileReader(); + } + + synchronized void closeDataFileAccessor(DataFileAccessor reader) { + Pool pool = pools.get(reader.getDataFile().getDataFileId()); + if (pool == null || closed) { + reader.dispose(); + } else { + pool.closeDataFileReader(reader); + } + } + + public synchronized void close() { + if (closed) { + return; + } + closed = true; + for (Iterator iter = pools.values().iterator(); iter.hasNext();) { + Pool pool = iter.next(); + pool.dispose(); + } + pools.clear(); + } + +}