Author: rajdavies Date: Thu Nov 1 11:37:28 2007 New Revision: 591113 URL: http://svn.apache.org/viewvc?rev=591113&view=rev Log: Fix for https://issues.apache.org/activemq/browse/AMQ-1485 Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeIndex.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java?rev=591113&r1=591112&r2=591113&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java Thu Nov 1 11:37:28 2007 @@ -222,4 +222,40 @@ * @return the StoreEntry */ StoreEntry getEntry(K key); + + /** + * Set the index bin size + * @param size + */ + void setIndexBinSize(int size); + + /** + * @return index bin size + */ + int getIndexBinSize(); + + + /** + * Add the index key size + * @param size + */ + void setIndexKeySize(int size); + + + /** + * @return the index key size + */ + int getIndexKeySize(); + + + /** + * Set the index page size + * @param size + */ + void setIndexPageSize(int size); + + /** + * @return the index page size + */ + int getIndexPageSize(); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?rev=591113&r1=591112&r2=591113&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java Thu Nov 1 11:37:28 2007 @@ -52,6 +52,9 @@ protected Marshaller keyMarshaller = Store.OBJECT_MARSHALLER; protected Marshaller valueMarshaller = Store.OBJECT_MARSHALLER; protected File directory; + private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE; + private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE; + private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE; public MapContainerImpl(File directory, ContainerId id, IndexItem root, IndexManager indexManager, DataManager dataManager, boolean persistentIndex) { @@ -66,7 +69,11 @@ String name = containerId.getDataContainerName() + "_" + containerId.getKey(); name = name.replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"); try { - this.index = new HashIndex(directory, name, indexManager); + HashIndex hashIndex = new HashIndex(directory, name, indexManager); + hashIndex.setNumberOfBins(getIndexBinSize()); + hashIndex.setKeySize(getIndexKeySize()); + hashIndex.setPageSize(getIndexPageSize()); + this.index = hashIndex; } catch (IOException e) { LOG.error("Failed to create HashIndex", e); throw new RuntimeException(e); @@ -527,4 +534,30 @@ } return index; } + + public int getIndexBinSize() { + return indexBinSize; + } + + public void setIndexBinSize(int indexBinSize) { + this.indexBinSize = indexBinSize; + } + + public int getIndexKeySize() { + return indexKeySize; + } + + public void setIndexKeySize(int indexKeySize) { + this.indexKeySize = indexKeySize; + } + + public int getIndexPageSize() { + return indexPageSize; + } + + public void setIndexPageSize(int indexPageSize) { + this.indexPageSize = indexPageSize; + } + + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java?rev=591113&r1=591112&r2=591113&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java Thu Nov 1 11:37:28 2007 @@ -37,10 +37,10 @@ * @version $Revision: 1.1.1.1 $ */ public class HashIndex implements Index { - + public static final int DEFAULT_PAGE_SIZE; + public static final int DEFAULT_KEY_SIZE; + public static final int DEFAULT_BIN_SIZE; private static final String NAME_PREFIX = "hash-index-"; - private static final int DEFAULT_PAGE_SIZE; - private static final int DEFAULT_KEY_SIZE; private static final Log LOG = LogFactory.getLog(HashIndex.class); private final String name; private File directory; @@ -49,6 +49,7 @@ private IndexManager indexManager; private int pageSize = DEFAULT_PAGE_SIZE; private int keySize = DEFAULT_KEY_SIZE; + private int numberOfBins = DEFAULT_BIN_SIZE; private int keysPerPage = pageSize / keySize; private DataByteArrayInputStream dataIn; private DataByteArrayOutputStream dataOut; @@ -60,21 +61,10 @@ private HashPage lastFree; private AtomicBoolean loaded = new AtomicBoolean(); private LRUCache pageCache; - private boolean enablePageCaching; - private int pageCacheSize = 10; - - /** - * Constructor - * - * @param directory - * @param name - * @param indexManager - * @throws IOException - */ - public HashIndex(File directory, String name, IndexManager indexManager) throws IOException { - this(directory, name, indexManager, 1024); - } + private boolean enablePageCaching=true; + private int pageCacheSize = 1; + /** * Constructor * @@ -84,15 +74,9 @@ * @param numberOfBins * @throws IOException */ - public HashIndex(File directory, String name, IndexManager indexManager, int numberOfBins) throws IOException { + public HashIndex(File directory, String name, IndexManager indexManager) throws IOException { this.directory = directory; this.name = name; - this.indexManager = indexManager; - int capacity = 1; - while (capacity < numberOfBins) { - capacity <<= 1; - } - this.bins = new HashBin[capacity]; openIndexFile(); pageCache = new LRUCache(pageCacheSize, pageCacheSize, 0.75f, true); } @@ -139,6 +123,23 @@ } this.pageSize = pageSize; } + + /** + * @return number of bins + */ + public int getNumberOfBins() { + return this.numberOfBins; + } + + /** + * @param numberOfBins + */ + public void setNumberOfBins(int numberOfBins) { + if (loaded.get() && numberOfBins != this.numberOfBins) { + throw new RuntimeException("Pages already loaded - can't reset bin size"); + } + this.numberOfBins = numberOfBins; + } /** * @return the enablePageCaching @@ -175,6 +176,12 @@ public synchronized void load() { if (loaded.compareAndSet(false, true)) { + this.indexManager = indexManager; + int capacity = 1; + while (capacity < numberOfBins) { + capacity <<= 1; + } + this.bins = new HashBin[capacity]; keysPerPage = pageSize / keySize; dataIn = new DataByteArrayInputStream(); dataOut = new DataByteArrayOutputStream(pageSize); @@ -439,5 +446,6 @@ static { DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "16384")); DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96")); + DEFAULT_BIN_SIZE= Integer.parseInt(System.getProperty("defaultBinSize", "1024")); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeIndex.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeIndex.java?rev=591113&r1=591112&r2=591113&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeIndex.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeIndex.java Thu Nov 1 11:37:28 2007 @@ -85,9 +85,6 @@ * @param marshaller */ public void setKeyMarshaller(Marshaller marshaller) { - if (loaded.get()) { - throw new RuntimeException("Pages already loaded - can't set marshaller now"); - } this.keyMarshaller = marshaller; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=591113&r1=591112&r2=591113&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Thu Nov 1 11:37:28 2007 @@ -44,6 +44,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.kaha.impl.async.AsyncDataManager; import org.apache.activemq.kaha.impl.async.Location; +import org.apache.activemq.kaha.impl.index.hash.HashIndex; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; @@ -105,6 +106,9 @@ private boolean persistentIndex=true; private boolean useNio = true; private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH; + private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE; + private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE; + private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE; private Map> dataFilesInProgress = new ConcurrentHashMap> (); @@ -628,6 +632,9 @@ protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException { KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize); adaptor.setPersistentIndex(isPersistentIndex()); + adaptor.setIndexBinSize(getIndexBinSize()); + adaptor.setIndexKeySize(getIndexKeySize()); + adaptor.setIndexPageSize(getIndexPageSize()); return adaptor; } @@ -756,6 +763,31 @@ public void setCheckpointInterval(long checkpointInterval) { this.checkpointInterval = checkpointInterval; } + + public int getIndexBinSize() { + return indexBinSize; + } + + public void setIndexBinSize(int indexBinSize) { + this.indexBinSize = indexBinSize; + } + + public int getIndexKeySize() { + return indexKeySize; + } + + public void setIndexKeySize(int indexKeySize) { + this.indexKeySize = indexKeySize; + } + + public int getIndexPageSize() { + return indexPageSize; + } + + public void setIndexPageSize(int indexPageSize) { + this.indexPageSize = indexPageSize; + } + protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) { Setset = dataFilesInProgress.get(store); @@ -771,7 +803,5 @@ if (set != null) { set.remove(dataFileId); } - } - - + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=591113&r1=591112&r2=591113&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Thu Nov 1 11:37:28 2007 @@ -39,6 +39,7 @@ import org.apache.activemq.kaha.MessageIdMarshaller; import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.StoreFactory; +import org.apache.activemq.kaha.impl.index.hash.HashIndex; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.ReferenceStore; import org.apache.activemq.store.ReferenceStoreAdapter; @@ -63,6 +64,9 @@ private boolean storeValid; private Store stateStore; private boolean persistentIndex = true; + private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE; + private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE; + private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE; public KahaReferenceStoreAdapter(AtomicLong size){ super(size); @@ -176,7 +180,10 @@ String containerName) throws IOException { Store store = getStore(); - MapContainer container = store.getMapContainer(id, containerName); + MapContainer container = store.getMapContainer(id, containerName,persistentIndex); + container.setIndexBinSize(getIndexBinSize()); + container.setIndexKeySize(getIndexKeySize()); + container.setIndexPageSize(getIndexPageSize()); container.setKeyMarshaller(new MessageIdMarshaller()); container.setValueMarshaller(new ReferenceRecordMarshaller()); container.load(); @@ -306,6 +313,30 @@ protected void removeSubscriberState(SubscriptionInfo info) { durableSubscribers.remove(info); + } + + public int getIndexBinSize() { + return indexBinSize; + } + + public void setIndexBinSize(int indexBinSize) { + this.indexBinSize = indexBinSize; + } + + public int getIndexKeySize() { + return indexKeySize; + } + + public void setIndexKeySize(int indexKeySize) { + this.indexKeySize = indexKeySize; + } + + public int getIndexPageSize() { + return indexPageSize; + } + + public void setIndexPageSize(int indexPageSize) { + this.indexPageSize = indexPageSize; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java?rev=591113&r1=591112&r2=591113&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java Thu Nov 1 11:37:28 2007 @@ -16,6 +16,9 @@ */ package org.apache.activemq.util; +import java.io.File; +import java.io.IOException; + /** * @version $Revision$ */ @@ -52,24 +55,71 @@ * @param name * @return */ - public static String toFileSystemSafeName( String name ) { - int size = name.length(); - StringBuffer rc = new StringBuffer(size*2); - for (int i = 0; i < size; i++) { - char c = name.charAt(i); - boolean valid = c >= 'a' && c <= 'z'; - valid = valid || (c >= 'A' && c <= 'Z'); - valid = valid || (c >= '0' && c <= '9'); - valid = valid || (c == '_') || (c == '-') || (c == '.') || (c == '/') || (c == '\\'); - - if( valid ) { - rc.append(c); - } else { - // Encode the character using hex notation - rc.append('#'); - rc.append(HexSupport.toHexFromInt(c, true)); - } - } - return rc.toString(); + public static String toFileSystemSafeName(String name) { + int size = name.length(); + StringBuffer rc = new StringBuffer(size * 2); + for (int i = 0; i < size; i++) { + char c = name.charAt(i); + boolean valid = c >= 'a' && c <= 'z'; + valid = valid || (c >= 'A' && c <= 'Z'); + valid = valid || (c >= '0' && c <= '9'); + valid = valid || (c == '_') || (c == '-') || (c == '.') + || (c == '/') || (c == '\\'); + + if (valid) { + rc.append(c); + } else { + // Encode the character using hex notation + rc.append('#'); + rc.append(HexSupport.toHexFromInt(c, true)); + } + } + return rc.toString(); + } + + public static boolean deleteFile(File fileToDelete) { + if (fileToDelete == null || !fileToDelete.exists()) { + return true; + } + boolean result = deleteChildren(fileToDelete); + result &= fileToDelete.delete(); + return result; } + + public static boolean deleteChildren(File parent) { + if (parent == null || !parent.exists()) { + return false; + } + boolean result = true; + if (parent.isDirectory()) { + File[] files = parent.listFiles(); + if (files == null) { + result = false; + } else { + for (int i = 0; i < files.length; i++) { + File file = files[i]; + if (file.getName().equals(".") + || file.getName().equals("..")) { + continue; + } + if (file.isDirectory()) { + result &= deleteFile(file); + } else { + result &= file.delete(); + } + } + } + } + + return result; + } + + + public static void moveFile(File src, File targetDirectory) throws IOException { + if (!src.renameTo(new File(targetDirectory, src.getName()))) { + throw new IOException("Failed to move " + src + " to " + targetDirectory); + } + } + + }