activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r591113 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: kaha/ kaha/impl/container/ kaha/impl/index/hash/ kaha/impl/index/tree/ store/amq/ store/kahadaptor/ util/
Date Thu, 01 Nov 2007 18:37:29 GMT
Author: rajdavies
Date: Thu Nov  1 11:37:28 2007
New Revision: 591113

URL: http://svn.apache.org/viewvc?rev=591113&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1485

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeIndex.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java?rev=591113&r1=591112&r2=591113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MapContainer.java
Thu Nov  1 11:37:28 2007
@@ -222,4 +222,40 @@
      * @return the StoreEntry
      */
     StoreEntry getEntry(K key);
+    
+    /**
+     * Set the index bin size
+     * @param size
+     */
+    void setIndexBinSize(int size);
+    
+    /**
+     * @return index bin size
+     */
+    int getIndexBinSize();
+    
+    
+    /**
+     * Add the index key size
+     * @param size
+     */
+    void setIndexKeySize(int size);
+    
+    
+    /**
+     * @return the index key size
+     */
+    int getIndexKeySize();
+    
+   
+    /**
+     * Set the index page size
+     * @param size
+     */
+    void setIndexPageSize(int size);
+    
+    /**
+     * @return the index page size
+     */
+    int getIndexPageSize();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?rev=591113&r1=591112&r2=591113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
Thu Nov  1 11:37:28 2007
@@ -52,6 +52,9 @@
     protected Marshaller keyMarshaller = Store.OBJECT_MARSHALLER;
     protected Marshaller valueMarshaller = Store.OBJECT_MARSHALLER;
     protected File directory;
+    private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
+    private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
+    private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
 
     public MapContainerImpl(File directory, ContainerId id, IndexItem root, IndexManager
indexManager,
                             DataManager dataManager, boolean persistentIndex) {
@@ -66,7 +69,11 @@
                 String name = containerId.getDataContainerName() + "_" + containerId.getKey();
                 name = name.replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_");
                 try {
-                    this.index = new HashIndex(directory, name, indexManager);
+                    HashIndex hashIndex = new HashIndex(directory, name, indexManager);
+                    hashIndex.setNumberOfBins(getIndexBinSize());
+                    hashIndex.setKeySize(getIndexKeySize());
+                    hashIndex.setPageSize(getIndexPageSize());
+                    this.index = hashIndex;
                 } catch (IOException e) {
                     LOG.error("Failed to create HashIndex", e);
                     throw new RuntimeException(e);
@@ -527,4 +534,30 @@
         }
         return index;
     }
+
+    public int getIndexBinSize() {
+        return indexBinSize;
+    }
+
+    public void setIndexBinSize(int indexBinSize) {
+        this.indexBinSize = indexBinSize;
+    }
+
+    public int getIndexKeySize() {
+        return indexKeySize;
+    }
+
+    public void setIndexKeySize(int indexKeySize) {
+        this.indexKeySize = indexKeySize;
+    }
+
+    public int getIndexPageSize() {
+        return indexPageSize;
+    }
+
+    public void setIndexPageSize(int indexPageSize) {
+        this.indexPageSize = indexPageSize;
+    }
+
+    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java?rev=591113&r1=591112&r2=591113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
Thu Nov  1 11:37:28 2007
@@ -37,10 +37,10 @@
  * @version $Revision: 1.1.1.1 $
  */
 public class HashIndex implements Index {
-
+    public static final int DEFAULT_PAGE_SIZE;
+    public static final int DEFAULT_KEY_SIZE;
+    public static final int DEFAULT_BIN_SIZE;
     private static final String NAME_PREFIX = "hash-index-";
-    private static final int DEFAULT_PAGE_SIZE;
-    private static final int DEFAULT_KEY_SIZE;
     private static final Log LOG = LogFactory.getLog(HashIndex.class);
     private final String name;
     private File directory;
@@ -49,6 +49,7 @@
     private IndexManager indexManager;
     private int pageSize = DEFAULT_PAGE_SIZE;
     private int keySize = DEFAULT_KEY_SIZE;
+    private int numberOfBins = DEFAULT_BIN_SIZE;
     private int keysPerPage = pageSize / keySize;
     private DataByteArrayInputStream dataIn;
     private DataByteArrayOutputStream dataOut;
@@ -60,21 +61,10 @@
     private HashPage lastFree;
     private AtomicBoolean loaded = new AtomicBoolean();
     private LRUCache<Long, HashPage> pageCache;
-    private boolean enablePageCaching;
-    private int pageCacheSize = 10;
-
-    /**
-     * Constructor
-     * 
-     * @param directory
-     * @param name
-     * @param indexManager
-     * @throws IOException
-     */
-    public HashIndex(File directory, String name, IndexManager indexManager) throws IOException
{
-        this(directory, name, indexManager, 1024);
-    }
+    private boolean enablePageCaching=true;
+    private int pageCacheSize = 1;
 
+    
     /**
      * Constructor
      * 
@@ -84,15 +74,9 @@
      * @param numberOfBins
      * @throws IOException
      */
-    public HashIndex(File directory, String name, IndexManager indexManager, int numberOfBins)
throws IOException {
+    public HashIndex(File directory, String name, IndexManager indexManager) throws IOException
{
         this.directory = directory;
         this.name = name;
-        this.indexManager = indexManager;
-        int capacity = 1;
-        while (capacity < numberOfBins) {
-            capacity <<= 1;
-        }
-        this.bins = new HashBin[capacity];
         openIndexFile();
         pageCache = new LRUCache<Long, HashPage>(pageCacheSize, pageCacheSize, 0.75f,
true);
     }
@@ -139,6 +123,23 @@
         }
         this.pageSize = pageSize;
     }
+    
+    /**
+     * @return number of bins
+     */
+    public int getNumberOfBins() {
+        return this.numberOfBins;
+    }
+
+    /**
+     * @param numberOfBins
+     */
+    public void setNumberOfBins(int numberOfBins) {
+        if (loaded.get() && numberOfBins != this.numberOfBins) {
+            throw new RuntimeException("Pages already loaded - can't reset bin size");
+        }
+        this.numberOfBins = numberOfBins;
+    }
 
     /**
      * @return the enablePageCaching
@@ -175,6 +176,12 @@
 
     public synchronized void load() {
         if (loaded.compareAndSet(false, true)) {
+            this.indexManager = indexManager;
+            int capacity = 1;
+            while (capacity < numberOfBins) {
+                capacity <<= 1;
+            }
+            this.bins = new HashBin[capacity];
             keysPerPage = pageSize / keySize;
             dataIn = new DataByteArrayInputStream();
             dataOut = new DataByteArrayOutputStream(pageSize);
@@ -439,5 +446,6 @@
     static {
         DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "16384"));
         DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96"));
+        DEFAULT_BIN_SIZE= Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeIndex.java?rev=591113&r1=591112&r2=591113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeIndex.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeIndex.java
Thu Nov  1 11:37:28 2007
@@ -85,9 +85,6 @@
      * @param marshaller
      */
     public void setKeyMarshaller(Marshaller marshaller) {
-        if (loaded.get()) {
-            throw new RuntimeException("Pages already loaded - can't set marshaller now");
-        }
         this.keyMarshaller = marshaller;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=591113&r1=591112&r2=591113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
Thu Nov  1 11:37:28 2007
@@ -44,6 +44,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.kaha.impl.async.AsyncDataManager;
 import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.kaha.impl.index.hash.HashIndex;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -105,6 +106,9 @@
     private boolean persistentIndex=true;
     private boolean useNio = true;
     private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
+    private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
+    private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
+    private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
     private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Set<Integer>>
();
 
 
@@ -628,6 +632,9 @@
     protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException
{
         KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize);
         adaptor.setPersistentIndex(isPersistentIndex());
+        adaptor.setIndexBinSize(getIndexBinSize());
+        adaptor.setIndexKeySize(getIndexKeySize());
+        adaptor.setIndexPageSize(getIndexPageSize());
         return adaptor;
     }
 
@@ -756,6 +763,31 @@
     public void setCheckpointInterval(long checkpointInterval) {
         this.checkpointInterval = checkpointInterval;
     }
+    
+    public int getIndexBinSize() {
+        return indexBinSize;
+    }
+
+    public void setIndexBinSize(int indexBinSize) {
+        this.indexBinSize = indexBinSize;
+    }
+
+    public int getIndexKeySize() {
+        return indexKeySize;
+    }
+
+    public void setIndexKeySize(int indexKeySize) {
+        this.indexKeySize = indexKeySize;
+    }
+
+    public int getIndexPageSize() {
+        return indexPageSize;
+    }
+
+    public void setIndexPageSize(int indexPageSize) {
+        this.indexPageSize = indexPageSize;
+    }
+
 	
 	protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
 	    Set<Integer>set = dataFilesInProgress.get(store);
@@ -771,7 +803,5 @@
         if (set != null) {
             set.remove(dataFileId);
         }
-    }
-
-    
+    }    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=591113&r1=591112&r2=591113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
Thu Nov  1 11:37:28 2007
@@ -39,6 +39,7 @@
 import org.apache.activemq.kaha.MessageIdMarshaller;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreFactory;
+import org.apache.activemq.kaha.impl.index.hash.HashIndex;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.ReferenceStore;
 import org.apache.activemq.store.ReferenceStoreAdapter;
@@ -63,6 +64,9 @@
     private boolean storeValid;
     private Store stateStore;
     private boolean persistentIndex = true;
+    private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
+    private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
+    private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
 
     public KahaReferenceStoreAdapter(AtomicLong size){
         super(size);
@@ -176,7 +180,10 @@
                                                                                 String containerName)
         throws IOException {
         Store store = getStore();
-        MapContainer<MessageId, ReferenceRecord> container = store.getMapContainer(id,
containerName);
+        MapContainer<MessageId, ReferenceRecord> container = store.getMapContainer(id,
containerName,persistentIndex);
+        container.setIndexBinSize(getIndexBinSize());
+        container.setIndexKeySize(getIndexKeySize());
+        container.setIndexPageSize(getIndexPageSize());
         container.setKeyMarshaller(new MessageIdMarshaller());
         container.setValueMarshaller(new ReferenceRecordMarshaller());
         container.load();
@@ -306,6 +313,30 @@
 
     protected void removeSubscriberState(SubscriptionInfo info) {
         durableSubscribers.remove(info);
+    }
+
+    public int getIndexBinSize() {
+        return indexBinSize;
+    }
+
+    public void setIndexBinSize(int indexBinSize) {
+        this.indexBinSize = indexBinSize;
+    }
+
+    public int getIndexKeySize() {
+        return indexKeySize;
+    }
+
+    public void setIndexKeySize(int indexKeySize) {
+        this.indexKeySize = indexKeySize;
+    }
+
+    public int getIndexPageSize() {
+        return indexPageSize;
+    }
+
+    public void setIndexPageSize(int indexPageSize) {
+        this.indexPageSize = indexPageSize;
     }
 
 	

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java?rev=591113&r1=591112&r2=591113&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java Thu
Nov  1 11:37:28 2007
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.util;
 
+import java.io.File;
+import java.io.IOException;
+
 /**
  * @version $Revision$
  */
@@ -52,24 +55,71 @@
      * @param name
      * @return
      */
-    public static String toFileSystemSafeName( String name ) {
-    	int size = name.length();
-    	StringBuffer rc = new StringBuffer(size*2);
-    	for (int i = 0; i < size; i++) {
-			char c = name.charAt(i);
-			boolean valid = c >= 'a' && c <= 'z';
-			valid = valid || (c >= 'A' && c <= 'Z');
-			valid = valid || (c >= '0' && c <= '9');
-			valid = valid || (c == '_') || (c == '-') || (c == '.') || (c == '/') || (c == '\\');
-			
-			if(  valid ) {
-				rc.append(c);
-			} else {
-				// Encode the character using hex notation
-				rc.append('#');
-				rc.append(HexSupport.toHexFromInt(c, true));
-			}
-		}
-    	return rc.toString();
+    public static String toFileSystemSafeName(String name) {
+        int size = name.length();
+        StringBuffer rc = new StringBuffer(size * 2);
+        for (int i = 0; i < size; i++) {
+            char c = name.charAt(i);
+            boolean valid = c >= 'a' && c <= 'z';
+            valid = valid || (c >= 'A' && c <= 'Z');
+            valid = valid || (c >= '0' && c <= '9');
+            valid = valid || (c == '_') || (c == '-') || (c == '.')
+                    || (c == '/') || (c == '\\');
+
+            if (valid) {
+                rc.append(c);
+            } else {
+                // Encode the character using hex notation
+                rc.append('#');
+                rc.append(HexSupport.toHexFromInt(c, true));
+            }
+        }
+        return rc.toString();
+    }
+
+    public static boolean deleteFile(File fileToDelete) {
+        if (fileToDelete == null || !fileToDelete.exists()) {
+            return true;
+        }
+        boolean result = deleteChildren(fileToDelete);
+        result &= fileToDelete.delete();
+        return result;
     }
+    
+    public static boolean deleteChildren(File parent) {
+        if (parent == null || !parent.exists()) {
+            return false;
+        }
+        boolean result = true;
+        if (parent.isDirectory()) {
+            File[] files = parent.listFiles();
+            if (files == null) {
+                result = false;
+            } else {
+                for (int i = 0; i < files.length; i++) {
+                    File file = files[i];
+                    if (file.getName().equals(".")
+                            || file.getName().equals("..")) {
+                        continue;
+                    }
+                    if (file.isDirectory()) {
+                        result &= deleteFile(file);
+                    } else {
+                        result &= file.delete();
+                    }
+                }
+            }
+        }
+       
+        return result;
+    }
+    
+    
+    public static void moveFile(File src, File targetDirectory) throws IOException {
+        if (!src.renameTo(new File(targetDirectory, src.getName()))) {
+            throw new IOException("Failed to move " + src + " to " + targetDirectory);
+        }
+    }
+
+   
 }



Mime
View raw message