activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r677944 [2/11] - in /activemq/sandbox/kahadb: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/kahadb/ src/main/java/org/apache/kahadb/impl/ src/main/java/org/apache/kahadb/impl/async/ s...
Date Fri, 18 Jul 2008 15:49:52 GMT
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/KahaStore.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/KahaStore.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/KahaStore.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,588 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileLock;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.ContainerId;
+import org.apache.kahadb.ListContainer;
+import org.apache.kahadb.MapContainer;
+import org.apache.kahadb.Store;
+import org.apache.kahadb.StoreLocation;
+import org.apache.kahadb.impl.async.AsyncDataManager;
+import org.apache.kahadb.impl.async.DataManagerFacade;
+import org.apache.kahadb.impl.container.ListContainerImpl;
+import org.apache.kahadb.impl.container.MapContainerImpl;
+import org.apache.kahadb.impl.data.DataManagerImpl;
+import org.apache.kahadb.impl.data.Item;
+import org.apache.kahadb.impl.data.RedoListener;
+import org.apache.kahadb.impl.index.IndexItem;
+import org.apache.kahadb.impl.index.IndexManager;
+import org.apache.kahadb.impl.index.RedoStoreIndexItem;
+import org.apache.kahadb.util.IOHelper;
+
+/**
+ * Store Implementation
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class KahaStore implements Store {
+
+    private static final String PROPERTY_PREFIX = "org.apache.activemq.kaha.Store";
+    private static final boolean BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
+                                                                                     + ".FileLockBroken",
+                                                                                     "false"));
+    private static final boolean DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
+                                                                                    + ".DisableLocking",
+                                                                                    "false"));
+    //according to the String javadoc, all constant strings are interned so this will be the same object throughout the vm
+    //and we can use it as a monitor for the lockset.
+    private final static String LOCKSET_MONITOR = PROPERTY_PREFIX + ".Lock.Monitor";
+    private static final Log LOG = LogFactory.getLog(KahaStore.class);
+
+    private final File directory;
+    private final String mode;
+    private IndexRootContainer mapsContainer;
+    private IndexRootContainer listsContainer;
+    private Map<ContainerId, ListContainerImpl> lists = new ConcurrentHashMap<ContainerId, ListContainerImpl>();
+    private Map<ContainerId, MapContainerImpl> maps = new ConcurrentHashMap<ContainerId, MapContainerImpl>();
+    private Map<String, DataManager> dataManagers = new ConcurrentHashMap<String, DataManager>();
+    private Map<String, IndexManager> indexManagers = new ConcurrentHashMap<String, IndexManager>();
+    private IndexManager rootIndexManager; // contains all the root indexes
+    private boolean closed;
+    private boolean initialized;
+    private boolean logIndexChanges;
+    private boolean useAsyncDataManager;
+    private long maxDataFileLength = 1024 * 1024 * 32;
+    private FileLock lock;
+    private boolean persistentIndex = true;
+    private RandomAccessFile lockFile;
+    private final AtomicLong storeSize;
+    private String defaultContainerName = DEFAULT_CONTAINER_NAME;
+
+    
+    public KahaStore(String name, String mode) throws IOException {
+    	this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, new AtomicLong());
+    }
+
+    public KahaStore(File directory, String mode) throws IOException {
+    	this(directory, mode, new AtomicLong());
+    }
+
+    public KahaStore(String name, String mode,AtomicLong storeSize) throws IOException {
+    	this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, storeSize);
+    }
+    
+    public KahaStore(File directory, String mode, AtomicLong storeSize) throws IOException {
+        this.mode = mode;
+        this.storeSize = storeSize;
+        this.directory = directory;
+        IOHelper.mkdirs(this.directory);
+    }
+
+    public synchronized void close() throws IOException {
+        if (!closed) {
+            closed = true;
+            if (initialized) {
+                unlock();
+                for (ListContainerImpl container : lists.values()) {
+                    container.close();
+                }
+                lists.clear();
+                for (MapContainerImpl container : maps.values()) {
+                    container.close();
+                }
+                maps.clear();
+                for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
+                    IndexManager im = iter.next();
+                    im.close();
+                    iter.remove();
+                }
+                for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
+                    DataManager dm = iter.next();
+                    dm.close();
+                    iter.remove();
+                }
+            }
+            if (lockFile!=null) {
+                lockFile.close();
+                lockFile=null;
+            }
+        }
+    }
+
+    public synchronized void force() throws IOException {
+        if (initialized) {
+            for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
+                IndexManager im = iter.next();
+                im.force();
+            }
+            for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
+                DataManager dm = iter.next();
+                dm.force();
+            }
+        }
+    }
+
+    public synchronized void clear() throws IOException {
+        initialize();
+        for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
+            ContainerId id = (ContainerId)i.next();
+            MapContainer container = getMapContainer(id.getKey(), id.getDataContainerName());
+            container.clear();
+        }
+        for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
+            ContainerId id = (ContainerId)i.next();
+            ListContainer container = getListContainer(id.getKey(), id.getDataContainerName());
+            container.clear();
+        }
+
+    }
+
+    public synchronized boolean delete() throws IOException {
+        boolean result = true;
+        if (initialized) {
+            clear();
+            for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
+                IndexManager im = iter.next();
+                result &= im.delete();
+                iter.remove();
+            }
+            for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
+                DataManager dm = iter.next();
+                result &= dm.delete();
+                iter.remove();
+            }
+        }
+        if (directory != null && directory.isDirectory()) {
+            result =IOHelper.deleteChildren(directory);
+            String str = result ? "successfully deleted" : "failed to delete";
+            LOG.info("Kaha Store " + str + " data directory " + directory);
+        }
+        return result;
+    }
+
+    public synchronized boolean isInitialized() {
+        return initialized;
+    }
+
+    public boolean doesMapContainerExist(Object id) throws IOException {
+        return doesMapContainerExist(id, defaultContainerName);
+    }
+
+    public synchronized boolean doesMapContainerExist(Object id, String containerName) throws IOException {
+        initialize();
+        ContainerId containerId = new ContainerId();
+        containerId.setKey(id);
+        containerId.setDataContainerName(containerName);
+        return maps.containsKey(containerId) || mapsContainer.doesRootExist(containerId);
+    }
+
+    public MapContainer getMapContainer(Object id) throws IOException {
+        return getMapContainer(id, defaultContainerName);
+    }
+
+    public MapContainer getMapContainer(Object id, String containerName) throws IOException {
+        return getMapContainer(id, containerName, persistentIndex);
+    }
+
+    public synchronized MapContainer getMapContainer(Object id, String originalContainerName, boolean persistentIndex)
+        throws IOException {
+        initialize();
+        String containerName = IOHelper.toFileSystemSafeName(originalContainerName);
+        ContainerId containerId = new ContainerId();
+        containerId.setKey(id);
+        containerId.setDataContainerName(containerName);
+        MapContainerImpl result = maps.get(containerId);
+        if (result == null) {
+            DataManager dm = getDataManager(containerName);
+            IndexManager im = getIndexManager(dm, containerName);
+            IndexItem root = mapsContainer.getRoot(im, containerId);
+            if (root == null) {
+                root = mapsContainer.addRoot(im, containerId);
+            }
+            result = new MapContainerImpl(directory, containerId, root, im, dm, persistentIndex);
+            maps.put(containerId, result);
+        }
+        return result;
+    }
+
+    public void deleteMapContainer(Object id) throws IOException {
+        deleteMapContainer(id, defaultContainerName);
+    }
+
+    public void deleteMapContainer(Object id, String containerName) throws IOException {
+        ContainerId containerId = new ContainerId(id, containerName);
+        deleteMapContainer(containerId);
+    }
+
+    public synchronized void deleteMapContainer(ContainerId containerId) throws IOException {
+        initialize();
+        MapContainerImpl container = maps.remove(containerId);
+        if (container != null) {
+            container.clear();
+            mapsContainer.removeRoot(container.getIndexManager(), containerId);
+            container.close();
+        }
+    }
+
+    public synchronized Set<ContainerId> getMapContainerIds() throws IOException {
+        initialize();
+        Set<ContainerId> set = new HashSet<ContainerId>();
+        for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
+            ContainerId id = (ContainerId)i.next();
+            set.add(id);
+        }
+        return set;
+    }
+
+    public boolean doesListContainerExist(Object id) throws IOException {
+        return doesListContainerExist(id, defaultContainerName);
+    }
+
+    public synchronized boolean doesListContainerExist(Object id, String containerName) throws IOException {
+        initialize();
+        ContainerId containerId = new ContainerId();
+        containerId.setKey(id);
+        containerId.setDataContainerName(containerName);
+        return lists.containsKey(containerId) || listsContainer.doesRootExist(containerId);
+    }
+
+    public ListContainer getListContainer(Object id) throws IOException {
+        return getListContainer(id, defaultContainerName);
+    }
+
+    public ListContainer getListContainer(Object id, String containerName) throws IOException {
+        return getListContainer(id, containerName, persistentIndex);
+    }
+
+    public synchronized ListContainer getListContainer(Object id, String originalContainerName,
+                                                       boolean persistentIndex) throws IOException {
+        initialize();
+        String containerName = IOHelper.toFileSystemSafeName(originalContainerName);
+        ContainerId containerId = new ContainerId();
+        containerId.setKey(id);
+        containerId.setDataContainerName(containerName);
+        ListContainerImpl result = lists.get(containerId);
+        if (result == null) {
+            DataManager dm = getDataManager(containerName);
+            IndexManager im = getIndexManager(dm, containerName);
+
+            IndexItem root = listsContainer.getRoot(im, containerId);
+            if (root == null) {
+                root = listsContainer.addRoot(im, containerId);
+            }
+            result = new ListContainerImpl(containerId, root, im, dm, persistentIndex);
+            lists.put(containerId, result);
+        }
+        return result;
+    }
+
+    public void deleteListContainer(Object id) throws IOException {
+        deleteListContainer(id, defaultContainerName);
+    }
+
+    public synchronized void deleteListContainer(Object id, String containerName) throws IOException {
+        ContainerId containerId = new ContainerId(id, containerName);
+        deleteListContainer(containerId);
+    }
+
+    public synchronized void deleteListContainer(ContainerId containerId) throws IOException {
+        initialize();
+        ListContainerImpl container = lists.remove(containerId);
+        if (container != null) {
+            listsContainer.removeRoot(container.getIndexManager(), containerId);
+            container.clear();
+            container.close();
+        }
+    }
+
+    public synchronized Set<ContainerId> getListContainerIds() throws IOException {
+        initialize();
+        Set<ContainerId> set = new HashSet<ContainerId>();
+        for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
+            ContainerId id = (ContainerId)i.next();
+            set.add(id);
+        }
+        return set;
+    }
+
+    /**
+     * @return the listsContainer
+     */
+    public IndexRootContainer getListsContainer() {
+        return this.listsContainer;
+    }
+
+    /**
+     * @return the mapsContainer
+     */
+    public IndexRootContainer getMapsContainer() {
+        return this.mapsContainer;
+    }
+
+    public synchronized DataManager getDataManager(String name) throws IOException {
+        DataManager dm = dataManagers.get(name);
+        if (dm == null) {
+            if (isUseAsyncDataManager()) {
+                AsyncDataManager t = new AsyncDataManager(storeSize);
+                t.setDirectory(directory);
+                t.setFilePrefix("async-data-" + name + "-");
+                t.setMaxFileLength((int)maxDataFileLength);
+                t.start();
+                dm = new DataManagerFacade(t, name);
+            } else {
+                DataManagerImpl t = new DataManagerImpl(directory, name,storeSize);
+                t.setMaxFileLength(maxDataFileLength);
+                dm = t;
+            }
+            if (logIndexChanges) {
+                recover(dm);
+            }
+            dataManagers.put(name, dm);
+        }
+        return dm;
+    }
+
+    public synchronized IndexManager getIndexManager(DataManager dm, String name) throws IOException {
+        IndexManager im = indexManagers.get(name);
+        if (im == null) {
+            im = new IndexManager(directory, name, mode, logIndexChanges ? dm : null,storeSize);
+            indexManagers.put(name, im);
+        }
+        return im;
+    }
+
+    private void recover(final DataManager dm) throws IOException {
+        dm.recoverRedoItems(new RedoListener() {
+            public void onRedoItem(StoreLocation item, Object o) throws Exception {
+                RedoStoreIndexItem redo = (RedoStoreIndexItem)o;
+                // IndexManager im = getIndexManager(dm, redo.getIndexName());
+                IndexManager im = getIndexManager(dm, dm.getName());
+                im.redo(redo);
+            }
+        });
+    }
+
+    public synchronized boolean isLogIndexChanges() {
+        return logIndexChanges;
+    }
+
+    public synchronized void setLogIndexChanges(boolean logIndexChanges) {
+        this.logIndexChanges = logIndexChanges;
+    }
+
+    /**
+     * @return the maxDataFileLength
+     */
+    public synchronized long getMaxDataFileLength() {
+        return maxDataFileLength;
+    }
+
+    /**
+     * @param maxDataFileLength the maxDataFileLength to set
+     */
+    public synchronized void setMaxDataFileLength(long maxDataFileLength) {
+        this.maxDataFileLength = maxDataFileLength;
+    }
+
+    /**
+     * @see org.apache.kahadb.IndexTypes
+     * @return the default index type
+     */
+    public synchronized String getIndexTypeAsString() {
+        return persistentIndex ? "PERSISTENT" : "VM";
+    }
+
+    /**
+     * Set the default index type
+     * 
+     * @param type
+     * @see org.apache.kahadb.IndexTypes
+     */
+    public synchronized void setIndexTypeAsString(String type) {
+        if (type.equalsIgnoreCase("VM")) {
+            persistentIndex = false;
+        } else {
+            persistentIndex = true;
+        }
+    }
+    
+    public boolean isPersistentIndex() {
+		return persistentIndex;
+	}
+
+	public void setPersistentIndex(boolean persistentIndex) {
+		this.persistentIndex = persistentIndex;
+	}
+	
+
+    public synchronized boolean isUseAsyncDataManager() {
+        return useAsyncDataManager;
+    }
+
+    public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) {
+        this.useAsyncDataManager = useAsyncWriter;
+    }
+
+    /**
+     * @return
+     * @see org.apache.kahadb.Store#size()
+     */
+    public long size(){
+        return storeSize.get();
+    }
+
+    public String getDefaultContainerName() {
+        return defaultContainerName;
+    }
+
+    public void setDefaultContainerName(String defaultContainerName) {
+        this.defaultContainerName = defaultContainerName;
+    }
+
+    public synchronized void initialize() throws IOException {
+        if (closed) {
+            throw new IOException("Store has been closed.");
+        }
+        if (!initialized) {       
+            LOG.info("Kaha Store using data directory " + directory);
+            lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
+            lock();
+            DataManager defaultDM = getDataManager(defaultContainerName);
+            rootIndexManager = getIndexManager(defaultDM, defaultContainerName);
+            IndexItem mapRoot = new IndexItem();
+            IndexItem listRoot = new IndexItem();
+            if (rootIndexManager.isEmpty()) {
+                mapRoot.setOffset(0);
+                rootIndexManager.storeIndex(mapRoot);
+                listRoot.setOffset(IndexItem.INDEX_SIZE);
+                rootIndexManager.storeIndex(listRoot);
+                rootIndexManager.setLength(IndexItem.INDEX_SIZE * 2);
+            } else {
+                mapRoot = rootIndexManager.getIndex(0);
+                listRoot = rootIndexManager.getIndex(IndexItem.INDEX_SIZE);
+            }
+            initialized = true;
+            mapsContainer = new IndexRootContainer(mapRoot, rootIndexManager, defaultDM);
+            listsContainer = new IndexRootContainer(listRoot, rootIndexManager, defaultDM);
+            /**
+             * Add interest in data files - then consolidate them
+             */
+            generateInterestInMapDataFiles();
+            generateInterestInListDataFiles();
+            for (Iterator<DataManager> i = dataManagers.values().iterator(); i.hasNext();) {
+                DataManager dm = i.next();
+                dm.consolidateDataFiles();
+            }
+        }
+    }
+
+    private void lock() throws IOException {
+        synchronized (LOCKSET_MONITOR) {
+            if (!DISABLE_LOCKING && directory != null && lock == null) {
+                String key = getPropertyKey();
+                String property = System.getProperty(key);
+                if (null == property) {
+                    if (!BROKEN_FILE_LOCK) {
+                        lock = lockFile.getChannel().tryLock();
+                        if (lock == null) {
+                            throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + "  is already opened by another application");
+                        } else
+                            System.setProperty(key, new Date().toString());
+                    }
+                } else { //already locked
+                    throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by this application.");
+                }
+            }
+        }
+    }
+
+    private void unlock() throws IOException {
+        synchronized (LOCKSET_MONITOR) {
+            if (!DISABLE_LOCKING && (null != directory) && (null != lock)) {
+                System.getProperties().remove(getPropertyKey());
+                if (lock.isValid()) {
+                    lock.release();
+                }
+                lock = null;
+            }
+        }
+    }
+
+
+    private String getPropertyKey() throws IOException {
+        return getClass().getName() + ".lock." + directory.getCanonicalPath();
+    }
+
+    /**
+     * scans the directory and builds up the IndexManager and DataManager
+     * 
+     * @throws IOException
+     */
+    private void generateInterestInListDataFiles() throws IOException {
+        for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
+            ContainerId id = (ContainerId)i.next();
+            DataManager dm = getDataManager(id.getDataContainerName());
+            IndexManager im = getIndexManager(dm, id.getDataContainerName());
+            IndexItem theRoot = listsContainer.getRoot(im, id);
+            long nextItem = theRoot.getNextItem();
+            while (nextItem != Item.POSITION_NOT_SET) {
+                IndexItem item = im.getIndex(nextItem);
+                item.setOffset(nextItem);
+                dm.addInterestInFile(item.getKeyFile());
+                dm.addInterestInFile(item.getValueFile());
+                nextItem = item.getNextItem();
+            }
+        }
+    }
+
+    /**
+     * scans the directory and builds up the IndexManager and DataManager
+     * 
+     * @throws IOException
+     */
+    private void generateInterestInMapDataFiles() throws IOException {
+        for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
+            ContainerId id = (ContainerId)i.next();
+            DataManager dm = getDataManager(id.getDataContainerName());
+            IndexManager im = getIndexManager(dm, id.getDataContainerName());
+            IndexItem theRoot = mapsContainer.getRoot(im, id);
+            long nextItem = theRoot.getNextItem();
+            while (nextItem != Item.POSITION_NOT_SET) {
+                IndexItem item = im.getIndex(nextItem);
+                item.setOffset(nextItem);
+                dm.addInterestInFile(item.getKeyFile());
+                dm.addInterestInFile(item.getValueFile());
+                nextItem = item.getNextItem();
+            }
+
+        }
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/StoreLockedExcpetion.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/StoreLockedExcpetion.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/StoreLockedExcpetion.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/StoreLockedExcpetion.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown if the store is in use by another application
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class StoreLockedExcpetion extends IOException {
+
+    private static final long serialVersionUID = 3857646689671366926L;
+
+    /**
+     * Default Constructor
+     */
+    public StoreLockedExcpetion() {
+    }
+
+    /**
+     * @param s
+     */
+    public StoreLockedExcpetion(String s) {
+        super(s);
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/AsyncDataManager.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/AsyncDataManager.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/AsyncDataManager.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,753 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.async;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.impl.async.DataFileAppender.WriteCommand;
+import org.apache.kahadb.impl.async.DataFileAppender.WriteKey;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.IOHelper;
+import org.apache.kahadb.util.Scheduler;
+
+
+
+/**
+ * Manages DataFiles
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class AsyncDataManager {
+
+    public static final int CONTROL_RECORD_MAX_LENGTH = 1024;
+    public static final int ITEM_HEAD_RESERVED_SPACE = 21;
+    // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
+    public static final int ITEM_HEAD_SPACE = 4 + 1 + ITEM_HEAD_RESERVED_SPACE + 3;
+    public static final int ITEM_HEAD_OFFSET_TO_SOR = ITEM_HEAD_SPACE - 3;
+    public static final int ITEM_FOOT_SPACE = 3; // EOR
+
+    public static final int ITEM_HEAD_FOOT_SPACE = ITEM_HEAD_SPACE + ITEM_FOOT_SPACE;
+
+    public static final byte[] ITEM_HEAD_SOR = new byte[] {'S', 'O', 'R'}; // 
+    public static final byte[] ITEM_HEAD_EOR = new byte[] {'E', 'O', 'R'}; // 
+
+    public static final byte DATA_ITEM_TYPE = 1;
+    public static final byte REDO_ITEM_TYPE = 2;
+    public static final String DEFAULT_DIRECTORY = "data";
+    public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
+    public static final String DEFAULT_FILE_PREFIX = "data-";
+    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
+    public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
+    public static final int PREFERED_DIFF = 1024 * 512;
+
+    private static final Log LOG = LogFactory.getLog(AsyncDataManager.class);
+
+    protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
+
+    protected File directory = new File(DEFAULT_DIRECTORY);
+    protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY);
+    protected String filePrefix = DEFAULT_FILE_PREFIX;
+    protected ControlFile controlFile;
+    protected boolean started;
+    protected boolean useNio = true;
+
+    protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
+    protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
+
+    protected DataFileAppender appender;
+    protected DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
+
+    protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
+    protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
+    protected DataFile currentWriteFile;
+
+    protected Location mark;
+    protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
+    protected Runnable cleanupTask;
+    protected final AtomicLong storeSize;
+    protected boolean archiveDataLogs;
+    
+    public AsyncDataManager(AtomicLong storeSize) {
+        this.storeSize=storeSize;
+    }
+    
+    public AsyncDataManager() {
+        this(new AtomicLong());
+    }
+
+    @SuppressWarnings("unchecked")
+    public synchronized void start() throws IOException {
+        if (started) {
+            return;
+        }
+
+        started = true;
+        preferedFileLength=Math.max(PREFERED_DIFF, getMaxFileLength()-PREFERED_DIFF);
+        lock();
+
+        ByteSequence sequence = controlFile.load();
+        if (sequence != null && sequence.getLength() > 0) {
+            unmarshallState(sequence);
+        }
+        if (useNio) {
+            appender = new NIODataFileAppender(this);
+        } else {
+            appender = new DataFileAppender(this);
+        }
+
+        File[] files = directory.listFiles(new FilenameFilter() {
+            public boolean accept(File dir, String n) {
+                return dir.equals(directory) && n.startsWith(filePrefix);
+            }
+        });
+       
+        if (files != null) {
+            for (int i = 0; i < files.length; i++) {
+                try {
+                    File file = files[i];
+                    String n = file.getName();
+                    String numStr = n.substring(filePrefix.length(), n.length());
+                    int num = Integer.parseInt(numStr);
+                    DataFile dataFile = new DataFile(file, num, preferedFileLength);
+                    fileMap.put(dataFile.getDataFileId(), dataFile);
+                    storeSize.addAndGet(dataFile.getLength());
+                } catch (NumberFormatException e) {
+                    // Ignore file that do not match the pattern.
+                }
+            }
+
+            // Sort the list so that we can link the DataFiles together in the
+            // right order.
+            List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
+            Collections.sort(l);
+            currentWriteFile = null;
+            for (DataFile df : l) {
+                if (currentWriteFile != null) {
+                    currentWriteFile.linkAfter(df);
+                }
+                currentWriteFile = df;
+                fileByFileMap.put(df.getFile(), df);
+            }
+        }
+
+        // Need to check the current Write File to see if there was a partial
+        // write to it.
+        if (currentWriteFile != null) {
+
+            // See if the lastSyncedLocation is valid..
+            Location l = lastAppendLocation.get();
+            if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
+                l = null;
+            }
+
+            // If we know the last location that was ok.. then we can skip lots
+            // of checking
+            try{
+            l = recoveryCheck(currentWriteFile, l);
+            lastAppendLocation.set(l);
+            }catch(IOException e){
+            	LOG.warn("recovery check failed", e);
+            }
+        }
+
+        storeState(false);
+
+        cleanupTask = new Runnable() {
+            public void run() {
+                cleanup();
+            }
+        };
+        Scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
+    }
+
+    public void lock() throws IOException {
+        synchronized (this) {
+            if (controlFile == null) {
+                IOHelper.mkdirs(directory);
+                controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH);
+            }
+            controlFile.lock();
+        }
+    }
+
+    protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
+        if (location == null) {
+            location = new Location();
+            location.setDataFileId(dataFile.getDataFileId());
+            location.setOffset(0);
+        }
+        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+        try {
+            reader.readLocationDetails(location);
+            while (reader.readLocationDetailsAndValidate(location)) {
+                location.setOffset(location.getOffset() + location.getSize());
+            }
+        } finally {
+            accessorPool.closeDataFileAccessor(reader);
+        }
+        dataFile.setLength(location.getOffset());
+        return location;
+    }
+
+    protected void unmarshallState(ByteSequence sequence) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength());
+        DataInputStream dis = new DataInputStream(bais);
+        if (dis.readBoolean()) {
+            mark = new Location();
+            mark.readExternal(dis);
+        } else {
+            mark = null;
+        }
+        if (dis.readBoolean()) {
+            Location l = new Location();
+            l.readExternal(dis);
+            lastAppendLocation.set(l);
+        } else {
+            lastAppendLocation.set(null);
+        }
+    }
+
+    private synchronized ByteSequence marshallState() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+
+        if (mark != null) {
+            dos.writeBoolean(true);
+            mark.writeExternal(dos);
+        } else {
+            dos.writeBoolean(false);
+        }
+        Location l = lastAppendLocation.get();
+        if (l != null) {
+            dos.writeBoolean(true);
+            l.writeExternal(dos);
+        } else {
+            dos.writeBoolean(false);
+        }
+
+        byte[] bs = baos.toByteArray();
+        return new ByteSequence(bs, 0, bs.length);
+    }
+
+    synchronized DataFile allocateLocation(Location location) throws IOException {
+        if (currentWriteFile == null || ((currentWriteFile.getLength() + location.getSize()) > maxFileLength)) {
+            int nextNum = currentWriteFile != null ? currentWriteFile.getDataFileId().intValue() + 1 : 1;
+
+            String fileName = filePrefix + nextNum;
+            File file = new File(directory, fileName);
+            DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
+            //actually allocate the disk space
+            nextWriteFile.closeRandomAccessFile(nextWriteFile.openRandomAccessFile(true));
+            fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
+            fileByFileMap.put(file, nextWriteFile);
+            if (currentWriteFile != null) {
+                currentWriteFile.linkAfter(nextWriteFile);
+                if (currentWriteFile.isUnused()) {
+                    removeDataFile(currentWriteFile);
+                }
+            }
+            currentWriteFile = nextWriteFile;
+
+        }
+        location.setOffset(currentWriteFile.getLength());
+        location.setDataFileId(currentWriteFile.getDataFileId().intValue());
+        int size = location.getSize();
+        currentWriteFile.incrementLength(size);
+        currentWriteFile.increment();
+        storeSize.addAndGet(size);
+        return currentWriteFile;
+    }
+    
+    public synchronized void removeLocation(Location location) throws IOException{
+       
+        DataFile dataFile = getDataFile(location);
+        dataFile.decrement();
+    }
+
+    synchronized DataFile getDataFile(Location item) throws IOException {
+        Integer key = Integer.valueOf(item.getDataFileId());
+        DataFile dataFile = fileMap.get(key);
+        if (dataFile == null) {
+            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+            throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
+        }
+        return dataFile;
+    }
+    
+    synchronized File getFile(Location item) throws IOException {
+        Integer key = Integer.valueOf(item.getDataFileId());
+        DataFile dataFile = fileMap.get(key);
+        if (dataFile == null) {
+            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+            throw new IOException("Could not locate data file " + filePrefix  + item.getDataFileId());
+        }
+        return dataFile.getFile();
+    }
+
+    private DataFile getNextDataFile(DataFile dataFile) {
+        return (DataFile)dataFile.getNext();
+    }
+
+    public synchronized void close() throws IOException {
+        if (!started) {
+            return;
+        }
+        Scheduler.cancel(cleanupTask);
+        accessorPool.close();
+        storeState(false);
+        appender.close();
+        fileMap.clear();
+        fileByFileMap.clear();
+        controlFile.unlock();
+        controlFile.dispose();
+        started = false;
+    }
+
+    synchronized void cleanup() {
+        if (accessorPool != null) {
+            accessorPool.disposeUnused();
+        }
+    }
+
+    public synchronized boolean delete() throws IOException {
+
+        // Close all open file handles...
+        appender.close();
+        accessorPool.close();
+
+        boolean result = true;
+        for (Iterator i = fileMap.values().iterator(); i.hasNext();) {
+            DataFile dataFile = (DataFile)i.next();
+            storeSize.addAndGet(-dataFile.getLength());
+            result &= dataFile.delete();
+        }
+        fileMap.clear();
+        fileByFileMap.clear();
+        lastAppendLocation.set(null);
+        mark = null;
+        currentWriteFile = null;
+
+        // reopen open file handles...
+        accessorPool = new DataFileAccessorPool(this);
+        if (useNio) {
+            appender = new NIODataFileAppender(this);
+        } else {
+            appender = new DataFileAppender(this);
+        }
+        return result;
+    }
+
+    public synchronized void addInterestInFile(int file) throws IOException {
+        if (file >= 0) {
+            Integer key = Integer.valueOf(file);
+            DataFile dataFile = (DataFile)fileMap.get(key);
+            if (dataFile == null) {
+                throw new IOException("That data file does not exist");
+            }
+            addInterestInFile(dataFile);
+        }
+    }
+
+    synchronized void addInterestInFile(DataFile dataFile) {
+        if (dataFile != null) {
+            dataFile.increment();
+        }
+    }
+
+    public synchronized void removeInterestInFile(int file) throws IOException {
+        if (file >= 0) {
+            Integer key = Integer.valueOf(file);
+            DataFile dataFile = (DataFile)fileMap.get(key);
+            removeInterestInFile(dataFile);
+        }
+       
+    }
+
+    synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
+        if (dataFile != null) {
+            if (dataFile.decrement() <= 0) {
+                removeDataFile(dataFile);
+            }
+        }
+    }
+
+    public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer>inProgress) throws IOException {
+        Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
+        unUsed.removeAll(inUse);
+        unUsed.removeAll(inProgress);
+                
+        List<DataFile> purgeList = new ArrayList<DataFile>();
+        for (Integer key : unUsed) {
+            DataFile dataFile = (DataFile)fileMap.get(key);
+            purgeList.add(dataFile);
+        }
+        for (DataFile dataFile : purgeList) {
+            if (dataFile.getDataFileId() != currentWriteFile.getDataFileId()) {
+                forceRemoveDataFile(dataFile);
+            }
+        }
+    }
+
+    public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastFile) throws IOException {
+        Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
+        unUsed.removeAll(inUse);
+                
+        List<DataFile> purgeList = new ArrayList<DataFile>();
+        for (Integer key : unUsed) {
+        	// Only add files less than the lastFile..
+        	if( key.intValue() < lastFile.intValue() ) {
+                DataFile dataFile = (DataFile)fileMap.get(key);
+                purgeList.add(dataFile);
+        	}
+        }
+        for (DataFile dataFile : purgeList) {
+            forceRemoveDataFile(dataFile);
+        }
+	}
+
+    public synchronized void consolidateDataFiles() throws IOException {
+        List<DataFile> purgeList = new ArrayList<DataFile>();
+        for (DataFile dataFile : fileMap.values()) {
+            if (dataFile.isUnused()) {
+                purgeList.add(dataFile);
+            }
+        }
+        for (DataFile dataFile : purgeList) {
+            removeDataFile(dataFile);
+        }
+    }
+
+    private synchronized void removeDataFile(DataFile dataFile) throws IOException {
+
+        // Make sure we don't delete too much data.
+        if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) {
+            LOG.debug("Won't remove DataFile" + dataFile);
+        	return;
+        }
+        forceRemoveDataFile(dataFile);
+    }
+    
+    private synchronized void forceRemoveDataFile(DataFile dataFile)
+            throws IOException {
+        accessorPool.disposeDataFileAccessors(dataFile);
+        fileByFileMap.remove(dataFile.getFile());
+        DataFile removed = fileMap.remove(dataFile.getDataFileId());
+        storeSize.addAndGet(-dataFile.getLength());
+        dataFile.unlink();
+        if (archiveDataLogs) {
+            dataFile.move(getDirectoryArchive());
+            LOG.info("moved data file " + dataFile + " to "
+                    + getDirectoryArchive());
+        } else {
+            boolean result = dataFile.delete();
+            LOG.info("discarding data file " + dataFile
+                    + (result ? "successful " : "failed"));
+        }
+    }
+
+    /**
+     * @return the maxFileLength
+     */
+    public int getMaxFileLength() {
+        return maxFileLength;
+    }
+
+    /**
+     * @param maxFileLength the maxFileLength to set
+     */
+    public void setMaxFileLength(int maxFileLength) {
+        this.maxFileLength = maxFileLength;
+    }
+
+    public String toString() {
+        return "DataManager:(" + filePrefix + ")";
+    }
+
+    public synchronized Location getMark() throws IllegalStateException {
+        return mark;
+    }
+
+    public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
+
+        Location cur = null;
+        while (true) {
+            if (cur == null) {
+                if (location == null) {
+                    DataFile head = (DataFile)currentWriteFile.getHeadNode();
+                    cur = new Location();
+                    cur.setDataFileId(head.getDataFileId());
+                    cur.setOffset(0);
+                } else {
+                    // Set to the next offset..
+                	if( location.getSize() == -1 ) {
+                		cur = new Location(location);
+                	}  else {
+	            		cur = new Location(location);
+	            		cur.setOffset(location.getOffset()+location.getSize());
+                	}
+                }
+            } else {
+                cur.setOffset(cur.getOffset() + cur.getSize());
+            }
+
+            DataFile dataFile = getDataFile(cur);
+
+            // Did it go into the next file??
+            if (dataFile.getLength() <= cur.getOffset()) {
+                dataFile = getNextDataFile(dataFile);
+                if (dataFile == null) {
+                    return null;
+                } else {
+                    cur.setDataFileId(dataFile.getDataFileId().intValue());
+                    cur.setOffset(0);
+                }
+            }
+
+            // Load in location size and type.
+            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+            try {
+                reader.readLocationDetails(cur);
+            } finally {
+                accessorPool.closeDataFileAccessor(reader);
+            }
+
+            if (cur.getType() == 0) {
+                return null;
+            } else if (cur.getType() > 0) {
+                // Only return user records.
+                return cur;
+            }
+        }
+    }
+    
+    public synchronized Location getNextLocation(File file, Location lastLocation,boolean thisFileOnly) throws IllegalStateException, IOException{
+        DataFile df = fileByFileMap.get(file);
+        return getNextLocation(df, lastLocation,thisFileOnly);
+    }
+    
+    public synchronized Location getNextLocation(DataFile dataFile,
+            Location lastLocation,boolean thisFileOnly) throws IOException, IllegalStateException {
+
+        Location cur = null;
+        while (true) {
+            if (cur == null) {
+                if (lastLocation == null) {
+                    DataFile head = (DataFile)dataFile.getHeadNode();
+                    cur = new Location();
+                    cur.setDataFileId(head.getDataFileId());
+                    cur.setOffset(0);
+                } else {
+                    // Set to the next offset..
+                    cur = new Location(lastLocation);
+                    cur.setOffset(cur.getOffset() + cur.getSize());
+                }
+            } else {
+                cur.setOffset(cur.getOffset() + cur.getSize());
+            }
+
+            
+            // Did it go into the next file??
+            if (dataFile.getLength() <= cur.getOffset()) {
+                if (thisFileOnly) {
+                    return null;
+                }else {
+                dataFile = getNextDataFile(dataFile);
+                if (dataFile == null) {
+                    return null;
+                } else {
+                    cur.setDataFileId(dataFile.getDataFileId().intValue());
+                    cur.setOffset(0);
+                }
+                }
+            }
+
+            // Load in location size and type.
+            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+            try {
+                reader.readLocationDetails(cur);
+            } finally {
+                accessorPool.closeDataFileAccessor(reader);
+            }
+
+            if (cur.getType() == 0) {
+                return null;
+            } else if (cur.getType() > 0) {
+                // Only return user records.
+                return cur;
+            }
+        }
+    }
+
+    public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
+        DataFile dataFile = getDataFile(location);
+        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+        ByteSequence rc = null;
+        try {
+            rc = reader.readRecord(location);
+        } finally {
+            accessorPool.closeDataFileAccessor(reader);
+        }
+        return rc;
+    }
+
+    public void setMark(Location location, boolean sync) throws IOException, IllegalStateException {
+        synchronized (this) {
+            mark = location;
+        }
+        storeState(sync);
+    }
+
+    protected synchronized void storeState(boolean sync) throws IOException {
+        ByteSequence state = marshallState();
+        appender.storeItem(state, Location.MARK_TYPE, sync);
+        controlFile.store(state, sync);
+    }
+
+    public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
+        Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
+        return loc;
+    }
+    
+    public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
+        Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
+        return loc;
+    }
+
+    public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
+        return appender.storeItem(data, type, sync);
+    }
+
+    public void update(Location location, ByteSequence data, boolean sync) throws IOException {
+        DataFile dataFile = getDataFile(location);
+        DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
+        try {
+            updater.updateRecord(location, data, sync);
+        } finally {
+            accessorPool.closeDataFileAccessor(updater);
+        }
+    }
+
+    public File getDirectory() {
+        return directory;
+    }
+
+    public void setDirectory(File directory) {
+        this.directory = directory;
+    }
+
+    public String getFilePrefix() {
+        return filePrefix;
+    }
+
+    public void setFilePrefix(String filePrefix) {
+        this.filePrefix = filePrefix;
+    }
+
+    public Map<WriteKey, WriteCommand> getInflightWrites() {
+        return inflightWrites;
+    }
+
+    public Location getLastAppendLocation() {
+        return lastAppendLocation.get();
+    }
+
+    public void setLastAppendLocation(Location lastSyncedLocation) {
+        this.lastAppendLocation.set(lastSyncedLocation);
+    }
+
+	public boolean isUseNio() {
+		return useNio;
+	}
+
+	public void setUseNio(boolean useNio) {
+		this.useNio = useNio;
+	}
+	
+	public File getDirectoryArchive() {
+        return directoryArchive;
+    }
+
+    public void setDirectoryArchive(File directoryArchive) {
+        this.directoryArchive = directoryArchive;
+    }
+    
+    public boolean isArchiveDataLogs() {
+        return archiveDataLogs;
+    }
+
+    public void setArchiveDataLogs(boolean archiveDataLogs) {
+        this.archiveDataLogs = archiveDataLogs;
+    }
+
+    synchronized public Integer getCurrentDataFileId() {
+        if( currentWriteFile==null )
+            return null;
+        return currentWriteFile.getDataFileId();
+    }
+    
+    /**
+     * Get a set of files - only valid after start()
+     * @return files currently being used
+     */
+    public Set<File> getFiles(){
+        return fileByFileMap.keySet();
+    }
+
+	synchronized public long getDiskSize() {
+		long rc=0;
+        DataFile cur = (DataFile)currentWriteFile.getHeadNode();
+        while( cur !=null ) {
+        	rc += cur.getLength();
+        	cur = (DataFile) cur.getNext();
+        }
+		return rc;
+	}
+
+	synchronized public long getDiskSizeUntil(Location startPosition) {
+		long rc=0;
+        DataFile cur = (DataFile)currentWriteFile.getHeadNode();
+        while( cur !=null ) {
+        	if( cur.getDataFileId().intValue() >= startPosition.getDataFileId() ) {
+        		return rc + startPosition.getOffset();
+        	}
+        	rc += cur.getLength();
+        	cur = (DataFile) cur.getNext();
+        }
+		return rc;
+	}
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/ControlFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/ControlFile.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/ControlFile.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/ControlFile.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.async;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.IOExceptionSupport;
+
+/**
+ * Use to reliably store fixed sized state data. It stores the state in record
+ * that is versioned and repeated twice in the file so that a failure in the
+ * middle of the write of the first or second record do not not result in an
+ * unknown state.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public final class ControlFile {
+
+    private static final boolean DISABLE_FILE_LOCK = "true".equals(System.getProperty("java.nio.channels.FileLock.broken", "false"));
+    private final File file;
+
+    /** The File that holds the control data. */
+    private final RandomAccessFile randomAccessFile;
+    private final int maxRecordSize;
+    private final int firstRecordStart;
+    private final int secondRecordStart;
+    private final int firstRecordEnd;
+    private final int secondRecordEnd;
+
+    private long version;
+    private FileLock lock;
+    private boolean disposed;
+
+    public ControlFile(File file, int recordSize) throws IOException {
+        this.file = file;
+        this.maxRecordSize = recordSize + 4;
+        
+        // Calculate where the records start and end.
+        this.firstRecordStart = 8;
+        this.secondRecordStart = 8 + maxRecordSize + 8 + 8;
+        this.firstRecordEnd = firstRecordStart+maxRecordSize;
+        this.secondRecordEnd = secondRecordStart+maxRecordSize;
+
+        randomAccessFile = new RandomAccessFile(file, "rw");
+    }
+
+    /**
+     * Locks the control file.
+     * 
+     * @throws IOException
+     */
+    public void lock() throws IOException {
+        if (DISABLE_FILE_LOCK) {
+            return;
+        }
+
+        if (lock == null) {
+            try {
+                lock = randomAccessFile.getChannel().tryLock();
+            } catch (OverlappingFileLockException e) {
+                throw IOExceptionSupport.create("Control file '" + file + "' could not be locked.",e);
+            }
+            if (lock == null) {
+                throw new IOException("Control file '" + file + "' could not be locked.");
+            }
+        }
+    }
+
+    /**
+     * Un locks the control file.
+     * 
+     * @throws IOException
+     */
+    public void unlock() throws IOException {
+        if (DISABLE_FILE_LOCK) {
+            return;
+        }
+
+        if (lock != null) {
+            lock.release();
+            lock = null;
+        }
+    }
+
+    public void dispose() {
+        if (disposed) {
+            return;
+        }
+        disposed = true;
+        try {
+            unlock();
+        } catch (IOException ignore) {
+        }
+        try {
+            randomAccessFile.close();
+        } catch (IOException ignore) {
+        }
+    }
+
+    public synchronized ByteSequence load() throws IOException {
+        long l = randomAccessFile.length();
+        if (l < maxRecordSize) {
+            return null;
+        }
+
+        randomAccessFile.seek(firstRecordStart-8);
+        long v1 = randomAccessFile.readLong();
+        randomAccessFile.seek(firstRecordEnd);
+        long v1check = randomAccessFile.readLong();
+
+        randomAccessFile.seek(secondRecordStart - 8);
+        long v2 = randomAccessFile.readLong();
+        randomAccessFile.seek(secondRecordEnd);
+        long v2check = randomAccessFile.readLong();
+
+        byte[] data = null;
+        if (v2 == v2check) {
+            version = v2;
+            randomAccessFile.seek(secondRecordStart);
+            int size = randomAccessFile.readInt();
+            data = new byte[size];
+            randomAccessFile.readFully(data);
+        } else if (v1 == v1check) {
+            version = v1;
+            randomAccessFile.seek(firstRecordStart);
+            int size = randomAccessFile.readInt();
+            data = new byte[size];
+            randomAccessFile.readFully(data);
+        } else {
+            // Bummer.. Both checks are screwed. we don't know
+            // if any of the two buffer are ok. This should
+            // only happen is data got corrupted.
+            throw new IOException("Control data corrupted.");
+        }
+        return new ByteSequence(data, 0, data.length);
+    }
+
+    public void store(ByteSequence data, boolean sync) throws IOException {
+
+        version++;
+        randomAccessFile.setLength((maxRecordSize * 2) + 32);
+        randomAccessFile.seek(0);
+
+        // Write the first copy of the control data.
+        randomAccessFile.writeLong(version);
+        randomAccessFile.writeInt(data.getLength());
+        randomAccessFile.write(data.getData());
+        randomAccessFile.seek(firstRecordEnd);
+        randomAccessFile.writeLong(version);
+
+        // Write the second copy of the control data.
+        randomAccessFile.writeLong(version);
+        randomAccessFile.writeInt(data.getLength());
+        randomAccessFile.write(data.getData());
+        randomAccessFile.seek(secondRecordEnd);
+        randomAccessFile.writeLong(version);
+
+        if (sync) {
+            randomAccessFile.getFD().sync();
+        }
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFile.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFile.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFile.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.async;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.kahadb.util.IOHelper;
+import org.apache.kahadb.util.LinkedNode;
+
+/**
+ * DataFile
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class DataFile extends LinkedNode implements Comparable<DataFile> {
+
+    protected final File file;
+    protected final Integer dataFileId;
+    protected final int preferedSize;
+
+    protected int length;
+    protected int referenceCount;
+
+    DataFile(File file, int number, int preferedSize) {
+        this.file = file;
+        this.preferedSize = preferedSize;
+        this.dataFileId = Integer.valueOf(number);
+        length = (int)(file.exists() ? file.length() : 0);
+    }
+    
+    File getFile() {
+        return file;
+    }
+
+    public Integer getDataFileId() {
+        return dataFileId;
+    }
+
+    public synchronized int getLength() {
+        return length;
+    }
+
+    public void setLength(int length) {
+        this.length = length;
+    }
+
+    public synchronized void incrementLength(int size) {
+        length += size;
+    }
+
+    public synchronized int increment() {
+        return ++referenceCount;
+    }
+
+    public synchronized int decrement() {
+        return --referenceCount;
+    }
+    
+    public synchronized int getReferenceCount(){
+    	return referenceCount;
+    }
+
+    public synchronized boolean isUnused() {
+        return referenceCount <= 0;
+    }
+
+    public synchronized String toString() {
+        String result = file.getName() + " number = " + dataFileId + " , length = " + length + " refCount = " + referenceCount;
+        return result;
+    }
+
+    public synchronized RandomAccessFile openRandomAccessFile(boolean appender) throws IOException {
+        RandomAccessFile rc = new RandomAccessFile(file, "rw");
+        // When we start to write files size them up so that the OS has a chance
+        // to allocate the file contigously.
+        if (appender) {
+            if (length < preferedSize) {
+                rc.setLength(preferedSize);
+            }
+        }
+        return rc;
+    }
+
+    public synchronized void closeRandomAccessFile(RandomAccessFile file) throws IOException {
+        // On close set the file size to the real size.
+        if (length != file.length()) {
+            file.setLength(getLength());
+        }
+        file.close();
+    }
+
+    public synchronized boolean delete() throws IOException {
+        return file.delete();
+    }
+    
+    public synchronized void move(File targetDirectory) throws IOException{
+        IOHelper.moveFile(file,targetDirectory);
+    }
+
+    public int compareTo(DataFile df) {
+        return dataFileId - df.dataFileId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        boolean result = false;
+        if (o instanceof DataFile) {
+            result = compareTo((DataFile)o) == 0;
+        }
+        return result;
+    }
+
+    @Override
+    public int hashCode() {
+        return dataFileId;
+    }
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAccessor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAccessor.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAccessor.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAccessor.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.async;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Map;
+
+import org.apache.kahadb.impl.async.DataFileAppender.WriteCommand;
+import org.apache.kahadb.impl.async.DataFileAppender.WriteKey;
+import org.apache.kahadb.util.ByteSequence;
+
+/**
+ * Optimized Store reader and updater. Single threaded and synchronous. Use in
+ * conjunction with the DataFileAccessorPool of concurrent use.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+final class DataFileAccessor {
+
+    private final DataFile dataFile;
+    private final Map<WriteKey, WriteCommand> inflightWrites;
+    private final RandomAccessFile file;
+    private boolean disposed;
+
+    /**
+     * Construct a Store reader
+     * 
+     * @param fileId
+     * @throws IOException
+     */
+    public DataFileAccessor(AsyncDataManager dataManager, DataFile dataFile) throws IOException {
+        this.dataFile = dataFile;
+        this.inflightWrites = dataManager.getInflightWrites();
+        this.file = dataFile.openRandomAccessFile(false);
+    }
+
+    public DataFile getDataFile() {
+        return dataFile;
+    }
+
+    public void dispose() {
+        if (disposed) {
+            return;
+        }
+        disposed = true;
+        try {
+            dataFile.closeRandomAccessFile(file);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public ByteSequence readRecord(Location location) throws IOException {
+
+        if (!location.isValid()) {
+            throw new IOException("Invalid location: " + location);
+        }
+
+        WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+        if (asyncWrite != null) {
+            return asyncWrite.data;
+        }
+
+        try {
+
+            if (location.getSize() == Location.NOT_SET) {
+                file.seek(location.getOffset());
+                location.setSize(file.readInt());
+                file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE);
+            } else {
+                file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE);
+            }
+
+            byte[] data = new byte[location.getSize() - AsyncDataManager.ITEM_HEAD_FOOT_SPACE];
+            file.readFully(data);
+            return new ByteSequence(data, 0, data.length);
+
+        } catch (RuntimeException e) {
+            throw new IOException("Invalid location: " + location + ", : " + e);
+        }
+    }
+
+    public void readLocationDetails(Location location) throws IOException {
+        WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+        if (asyncWrite != null) {
+            location.setSize(asyncWrite.location.getSize());
+            location.setType(asyncWrite.location.getType());
+        } else {
+            file.seek(location.getOffset());
+            location.setSize(file.readInt());
+            location.setType(file.readByte());
+        }
+    }
+
+    public boolean readLocationDetailsAndValidate(Location location) {
+        try {
+            WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+            if (asyncWrite != null) {
+                location.setSize(asyncWrite.location.getSize());
+                location.setType(asyncWrite.location.getType());
+            } else {
+                file.seek(location.getOffset());
+                location.setSize(file.readInt());
+                location.setType(file.readByte());
+
+                byte data[] = new byte[3];
+                file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_OFFSET_TO_SOR);
+                file.readFully(data);
+                if (data[0] != AsyncDataManager.ITEM_HEAD_SOR[0]
+                    || data[1] != AsyncDataManager.ITEM_HEAD_SOR[1]
+                    || data[2] != AsyncDataManager.ITEM_HEAD_SOR[2]) {
+                    return false;
+                }
+                file.seek(location.getOffset() + location.getSize() - AsyncDataManager.ITEM_FOOT_SPACE);
+                file.readFully(data);
+                if (data[0] != AsyncDataManager.ITEM_HEAD_EOR[0]
+                    || data[1] != AsyncDataManager.ITEM_HEAD_EOR[1]
+                    || data[2] != AsyncDataManager.ITEM_HEAD_EOR[2]) {
+                    return false;
+                }
+            }
+        } catch (IOException e) {
+            return false;
+        }
+        return true;
+    }
+
+    public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
+
+        file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE);
+        int size = Math.min(data.getLength(), location.getSize());
+        file.write(data.getData(), data.getOffset(), size);
+        if (sync) {
+            file.getFD().sync();
+        }
+
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAccessorPool.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAccessorPool.java?rev=677944&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAccessorPool.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/DataFileAccessorPool.java Fri Jul 18 08:49:48 2008
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.impl.async;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Used to pool DataFileAccessors.
+ * 
+ * @author chirino
+ */
+public class DataFileAccessorPool {
+
+    private final AsyncDataManager dataManager;
+    private final Map<Integer, Pool> pools = new HashMap<Integer, Pool>();
+    private boolean closed;
+    private int maxOpenReadersPerFile = 5;
+
+    class Pool {
+
+        private final DataFile file;
+        private final List<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
+        private boolean used;
+        private int openCounter;
+        private boolean disposed;
+
+        public Pool(DataFile file) {
+            this.file = file;
+        }
+
+        public DataFileAccessor openDataFileReader() throws IOException {
+            DataFileAccessor rc = null;
+            if (pool.isEmpty()) {
+                rc = new DataFileAccessor(dataManager, file);
+            } else {
+                rc = (DataFileAccessor)pool.remove(pool.size() - 1);
+            }
+            used = true;
+            openCounter++;
+            return rc;
+        }
+
+        public synchronized void closeDataFileReader(DataFileAccessor reader) {
+            openCounter--;
+            if (pool.size() >= maxOpenReadersPerFile || disposed) {
+                reader.dispose();
+            } else {
+                pool.add(reader);
+            }
+        }
+
+        public synchronized void clearUsedMark() {
+            used = false;
+        }
+
+        public synchronized boolean isUsed() {
+            return used;
+        }
+
+        public synchronized void dispose() {
+            for (DataFileAccessor reader : pool) {
+                reader.dispose();
+            }
+            pool.clear();
+            disposed = true;
+        }
+
+        public synchronized int getOpenCounter() {
+            return openCounter;
+        }
+
+    }
+
+    public DataFileAccessorPool(AsyncDataManager dataManager) {
+        this.dataManager = dataManager;
+    }
+
+    synchronized void clearUsedMark() {
+        for (Iterator iter = pools.values().iterator(); iter.hasNext();) {
+            Pool pool = (Pool)iter.next();
+            pool.clearUsedMark();
+        }
+    }
+
+    synchronized void disposeUnused() {
+        for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
+            Pool pool = iter.next();
+            if (!pool.isUsed()) {
+                pool.dispose();
+                iter.remove();
+            }
+        }
+    }
+
+    synchronized void disposeDataFileAccessors(DataFile dataFile) {
+        if (closed) {
+            throw new IllegalStateException("Closed.");
+        }
+        Pool pool = pools.get(dataFile.getDataFileId());
+        if (pool != null) {
+            if (pool.getOpenCounter() == 0) {
+                pool.dispose();
+                pools.remove(dataFile.getDataFileId());
+            } else {
+                throw new IllegalStateException("The data file is still in use: " + dataFile + ", use count: " + pool.getOpenCounter());
+            }
+        }
+    }
+
+    synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
+        if (closed) {
+            throw new IOException("Closed.");
+        }
+
+        Pool pool = pools.get(dataFile.getDataFileId());
+        if (pool == null) {
+            pool = new Pool(dataFile);
+            pools.put(dataFile.getDataFileId(), pool);
+        }
+        return pool.openDataFileReader();
+    }
+
+    synchronized void closeDataFileAccessor(DataFileAccessor reader) {
+        Pool pool = pools.get(reader.getDataFile().getDataFileId());
+        if (pool == null || closed) {
+            reader.dispose();
+        } else {
+            pool.closeDataFileReader(reader);
+        }
+    }
+
+    public synchronized void close() {
+        if (closed) {
+            return;
+        }
+        closed = true;
+        for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
+            Pool pool = iter.next();
+            pool.dispose();
+        }
+        pools.clear();
+    }
+
+}



Mime
View raw message