Author: rajdavies
Date: Mon Nov 20 05:11:26 2006
New Revision: 477166
URL: http://svn.apache.org/viewvc?view=rev&rev=477166
Log:
fixed some timing issues using the Kaha Persistence Adaptor
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java Mon Nov 20 05:11:26 2006
@@ -89,8 +89,8 @@
synchronized public int incrementReferenceCount() {
int rc = ++referenceCount;
- if( persistent && rc==1 ) {
- assert message == null;
+ if( persistent && rc==1 && message == null) {
+
try {
message = destinationStore.getMessage(messageId);
if( message == null ) {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java Mon Nov 20 05:11:26 2006
@@ -240,4 +240,9 @@
* @see org.apache.activemq.kaha.IndexTypes
*/
public void setIndexType(String type);
+
+ /**
+ * @return true if the store has been initialized
+ */
+ public boolean isInitialized();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Mon Nov 20 05:11:26 2006
@@ -20,13 +20,13 @@
import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.kaha.IndexTypes;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
@@ -45,7 +45,6 @@
import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.ConcurrentHashMap;
/**
* Store Implementation
@@ -74,7 +73,7 @@
private String mode;
private boolean initialized;
private boolean logIndexChanges=false;
- private boolean useAsyncWriter=true;
+ private boolean useAsyncWriter=false;
private long maxDataFileLength=DataManager.MAX_FILE_LENGTH;
private FileLock lock;
private String indexType=IndexTypes.DISK_INDEX;
@@ -154,17 +153,21 @@
}
}
}
- log.info("Kaha Store deleted data directory "+directory);
+ String str=result?"successfully deleted":"failed to delete";
+ log.info("Kaha Store "+str+" data directory "+directory);
}
- initialized=false;
return result;
}
+
+ public synchronized boolean isInitialized(){
+ return initialized;
+ }
public boolean doesMapContainerExist(Object id) throws IOException{
return doesMapContainerExist(id,DEFAULT_CONTAINER_NAME);
}
- public boolean doesMapContainerExist(Object id,String containerName) throws IOException{
+ public synchronized boolean doesMapContainerExist(Object id,String containerName) throws IOException{
initialize();
ContainerId containerId=new ContainerId();
containerId.setKey(id);
@@ -204,7 +207,7 @@
deleteMapContainer(id,DEFAULT_CONTAINER_NAME);
}
- public void deleteMapContainer(Object id,String containerName) throws IOException{
+ public synchronized void deleteMapContainer(Object id,String containerName) throws IOException{
initialize();
ContainerId containerId=new ContainerId();
containerId.setKey(id);
@@ -216,7 +219,7 @@
}
}
- public Set getMapContainerIds() throws IOException{
+ public synchronized Set getMapContainerIds() throws IOException{
initialize();
Set set = new HashSet();
for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
@@ -230,7 +233,7 @@
return doesListContainerExist(id,DEFAULT_CONTAINER_NAME);
}
- public boolean doesListContainerExist(Object id,String containerName) throws IOException{
+ public synchronized boolean doesListContainerExist(Object id,String containerName) throws IOException{
initialize();
ContainerId containerId=new ContainerId();
containerId.setKey(id);
@@ -271,7 +274,7 @@
deleteListContainer(id,DEFAULT_CONTAINER_NAME);
}
- public void deleteListContainer(Object id,String containerName) throws IOException{
+ public synchronized void deleteListContainer(Object id,String containerName) throws IOException{
initialize();
ContainerId containerId=new ContainerId();
containerId.setKey(id);
@@ -283,7 +286,7 @@
}
}
- public Set getListContainerIds() throws IOException{
+ public synchronized Set getListContainerIds() throws IOException{
initialize();
Set set = new HashSet();
for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
@@ -310,7 +313,7 @@
return this.mapsContainer;
}
- public DataManager getDataManager(String name) throws IOException{
+ public synchronized DataManager getDataManager(String name) throws IOException{
DataManager dm=(DataManager)dataManagers.get(name);
if(dm==null){
dm=new DataManager(directory,name);
@@ -322,7 +325,7 @@
return dm;
}
- public IndexManager getIndexManager(DataManager dm,String name) throws IOException{
+ public synchronized IndexManager getIndexManager(DataManager dm,String name) throws IOException{
IndexManager im=(IndexManager)indexManagers.get(name);
if(im==null){
im=new IndexManager(directory,name,mode,logIndexChanges?dm:null);
@@ -343,18 +346,18 @@
});
}
- public boolean isLogIndexChanges(){
+ public synchronized boolean isLogIndexChanges(){
return logIndexChanges;
}
- public void setLogIndexChanges(boolean logIndexChanges){
+ public synchronized void setLogIndexChanges(boolean logIndexChanges){
this.logIndexChanges=logIndexChanges;
}
/**
* @return the maxDataFileLength
*/
- public long getMaxDataFileLength(){
+ public synchronized long getMaxDataFileLength(){
return maxDataFileLength;
}
@@ -362,7 +365,7 @@
* @param maxDataFileLength
* the maxDataFileLength to set
*/
- public void setMaxDataFileLength(long maxDataFileLength){
+ public synchronized void setMaxDataFileLength(long maxDataFileLength){
this.maxDataFileLength=maxDataFileLength;
}
@@ -370,7 +373,7 @@
* @see org.apache.activemq.kaha.IndexTypes
* @return the default index type
*/
- public String getIndexType(){
+ public synchronized String getIndexType(){
return indexType;
}
@@ -380,7 +383,7 @@
* @param type
* @see org.apache.activemq.kaha.IndexTypes
*/
- public void setIndexType(String type){
+ public synchronized void setIndexType(String type){
if(type==null||(!type.equals(IndexTypes.DISK_INDEX)&&!type.equals(IndexTypes.IN_MEMORY_INDEX))){
throw new RuntimeException("Unknown IndexType: "+type);
}
@@ -444,7 +447,7 @@
}
}
- private void unlock() throws IOException{
+ private synchronized void unlock() throws IOException{
if(!disableLocking&&directory!=null){
Set set=getVmLockSet();
synchronized(set){
@@ -485,11 +488,11 @@
* scans the directory and builds up the IndexManager and DataManager
* @throws IOException
*/
- private void generateInterestInListDataFiles() throws IOException {
- for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
- ContainerId id = (ContainerId)i.next();
- DataManager dm = getDataManager(id.getDataContainerName());
- IndexManager im = getIndexManager(dm,id.getDataContainerName());
+ private void generateInterestInListDataFiles() throws IOException{
+ for(Iterator i=listsContainer.getKeys().iterator();i.hasNext();){
+ ContainerId id=(ContainerId)i.next();
+ DataManager dm=getDataManager(id.getDataContainerName());
+ IndexManager im=getIndexManager(dm,id.getDataContainerName());
IndexItem theRoot=listsContainer.getRoot(im,id);
long nextItem=theRoot.getNextItem();
while(nextItem!=Item.POSITION_NOT_SET){
@@ -499,13 +502,13 @@
dm.addInterestInFile(item.getValueFile());
nextItem=item.getNextItem();
}
-
}
}
/**
* scans the directory and builds up the IndexManager and DataManager
- * @throws IOException
+ *
+ * @throws IOException
*/
private void generateInterestInMapDataFiles() throws IOException {
for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java Mon Nov 20 05:11:26 2006
@@ -305,7 +305,7 @@
return result;
}
- protected void remove(IndexItem item){
+ protected synchronized void remove(IndexItem item){
IndexItem prev=indexList.getPrevEntry(item);
IndexItem next=indexList.getNextEntry(item);
indexList.remove(item);
@@ -424,7 +424,7 @@
*
* @see java.util.List#set(int, E)
*/
- public Object set(int index,Object element){
+ public synchronized Object set(int index,Object element){
load();
Object result=null;
IndexItem replace=indexList.isEmpty()?null:(IndexItem)indexList.get(index);
@@ -438,7 +438,7 @@
return result;
}
- protected IndexItem internalSet(int index,Object element){
+ protected synchronized IndexItem internalSet(int index,Object element){
IndexItem replace=indexList.isEmpty()?null:(IndexItem)indexList.get(index);
IndexItem prev=(indexList.isEmpty()||(index-1)<0)?null:(IndexItem)indexList.get(index-1);
IndexItem next=(indexList.isEmpty()||(index+1)>=size())?null:(IndexItem)indexList.get(index+1);
@@ -460,7 +460,7 @@
itemAdded(item,index,element);
}
- protected StoreEntry internalAddLast(Object o){
+ protected synchronized StoreEntry internalAddLast(Object o){
load();
IndexItem item=writeLast(o);
indexList.addLast(item);
@@ -468,7 +468,7 @@
return item;
}
- protected StoreEntry internalAddFirst(Object o){
+ protected synchronized StoreEntry internalAddFirst(Object o){
load();
IndexItem item=writeFirst(o);
indexList.addFirst(item);
@@ -476,7 +476,7 @@
return item;
}
- protected IndexItem internalAdd(int index,Object element){
+ protected synchronized IndexItem internalAdd(int index,Object element){
load();
IndexItem item=insert(index,element);
indexList.add(index,item);
@@ -484,7 +484,7 @@
return item;
}
- protected StoreEntry internalGet(int index){
+ protected synchronized StoreEntry internalGet(int index){
load();
if(index>=0&&index<indexList.size()){
return indexList.get(index);
@@ -646,7 +646,7 @@
* @param object
* @see org.apache.activemq.kaha.ListContainer#update(org.apache.activemq.kaha.StoreEntry, java.lang.Object)
*/
- public void update(StoreEntry entry,Object object){
+ public synchronized void update(StoreEntry entry,Object object){
try{
dataManager.updateItem(entry.getValueDataItem(),marshaller,object);
}catch(IOException e){
@@ -733,7 +733,7 @@
return indexList.getEntry(entry);
}
- protected IndexItem writeLast(Object value){
+ protected synchronized IndexItem writeLast(Object value){
IndexItem index=null;
try{
if(value!=null){
@@ -760,7 +760,7 @@
return index;
}
- protected IndexItem writeFirst(Object value){
+ protected synchronized IndexItem writeFirst(Object value){
IndexItem index=null;
try{
if(value!=null){
@@ -786,7 +786,7 @@
return index;
}
- protected IndexItem insert(int insertPos,Object value){
+ protected synchronized IndexItem insert(int insertPos,Object value){
IndexItem index=null;
try{
if(value!=null){
@@ -823,7 +823,7 @@
return index;
}
- protected Object getValue(StoreEntry item){
+ protected synchronized Object getValue(StoreEntry item){
Object result=null;
if(item!=null){
try{
@@ -858,7 +858,7 @@
return result.toString();
}
- protected void itemAdded(IndexItem item,int pos,Object value){
+ protected synchronized void itemAdded(IndexItem item,int pos,Object value){
if(cacheEnabled){
int cachePosition=pos-offset;
// if pos is before the cache offset
@@ -882,7 +882,7 @@
}
}
- protected void itemRemoved(int pos){
+ protected synchronized void itemRemoved(int pos){
if(cacheEnabled){
int lastPosition=offset+cacheList.size()-1;
int cachePosition=pos-offset;
@@ -900,7 +900,7 @@
}
}
- protected Object getCachedItem(int pos){
+ protected synchronized Object getCachedItem(int pos){
Object result=null;
if(cacheEnabled) {
int cachePosition=pos-offset;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java Mon Nov 20 05:11:26 2006
@@ -293,7 +293,7 @@
return result;
}
- protected void remove(IndexItem item){
+ protected synchronized void remove(IndexItem item){
Object key=getKey(item);
if(key!=null){
remove(key);
@@ -321,7 +321,7 @@
* @param value
* @return the StoreEntry associated with the entry
*/
- public StoreEntry place(Object key, Object value) {
+ public synchronized StoreEntry place(Object key, Object value) {
load();
if(indexMap.containsKey(key)){
remove(key);
@@ -336,7 +336,7 @@
* Remove an Entry from ther Map
* @param entry
*/
- public void remove(StoreEntry entry) {
+ public synchronized void remove(StoreEntry entry) {
load();
IndexItem item=(IndexItem)entry;
if(item!=null){
@@ -393,7 +393,7 @@
}
- protected Set getInternalKeySet(){
+ protected synchronized Set getInternalKeySet(){
return new HashSet(indexMap.keySet());
}
@@ -401,7 +401,7 @@
return indexList;
}
- protected IndexItem write(Object key,Object value){
+ protected synchronized IndexItem write(Object key,Object value){
IndexItem index=null;
try{
if(key!=null){
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java Mon Nov 20 05:11:26 2006
@@ -54,7 +54,7 @@
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem)
*/
- public byte readDataItemSize(DataItem item) throws IOException {
+ public synchronized byte readDataItemSize(DataItem item) throws IOException {
WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(item));
if( asyncWrite!= null ) {
item.setSize(asyncWrite.location.getSize());
@@ -73,7 +73,7 @@
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller, org.apache.activemq.kaha.StoreLocation)
*/
- public Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
+ public synchronized Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(item));
if( asyncWrite!= null ) {
ByteArrayInputStream stream = new ByteArrayInputStream(asyncWrite.data, DataManager.ITEM_HEAD_SIZE, item.getSize());
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java Mon Nov 20 05:11:26 2006
@@ -135,7 +135,7 @@
* @return
* @throws IOException
*/
- public DataItem storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
+ public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
// We may need to slow down if we are pounding the async thread too
// hard..
try {
@@ -177,37 +177,36 @@
/**
*
*/
- public void updateItem(final DataItem item, Marshaller marshaller, Object payload, byte type) throws IOException {
- // We may need to slow down if we are pounding the async thread too
- // hard..
- try {
- usage.waitForSpace();
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- }
-
- //Write the packet our internal buffer.
- final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
- buffer.position(DataManager.ITEM_HEAD_SIZE);
- marshaller.writePayload(payload,buffer);
- final int size=buffer.size();
- int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
- buffer.reset();
- buffer.writeByte(type);
- buffer.writeInt(payloadSize);
- item.setSize(payloadSize);
- final DataFile dataFile = dataManager.getDataFile(item);
-
- usage.increaseUsage(size);
-
- WriteCommand write = new WriteCommand(item, dataFile.getRandomAccessFile(), buffer.getData(), latchAssignedToNewWrites);
-
- // Equeue the write to an async thread.
- synchronized(enqueueMutex) {
- dataFile.setWriterData(latchAssignedToNewWrites);
- enqueue(write);
+ public void updateItem(final DataItem item,Marshaller marshaller,Object payload,byte type) throws IOException{
+ // We may need to slow down if we are pounding the async thread too
+ // hard..
+ try{
+ usage.waitForSpace();
+ }catch(InterruptedException e){
+ throw new InterruptedIOException();
+ }
+ synchronized(enqueueMutex){
+ // Write the packet our internal buffer.
+ final DataByteArrayOutputStream buffer=new DataByteArrayOutputStream();
+ buffer.position(DataManager.ITEM_HEAD_SIZE);
+ marshaller.writePayload(payload,buffer);
+ final int size=buffer.size();
+ int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
+ buffer.reset();
+ buffer.writeByte(type);
+ buffer.writeInt(payloadSize);
+ item.setSize(payloadSize);
+ final DataFile dataFile=dataManager.getDataFile(item);
+ usage.increaseUsage(size);
+ WriteCommand write=new WriteCommand(item,dataFile.getRandomAccessFile(),buffer.getData(),
+ latchAssignedToNewWrites);
+ // Equeue the write to an async thread.
+ synchronized(enqueueMutex){
+ dataFile.setWriterData(latchAssignedToNewWrites);
+ enqueue(write);
+ }
+ inflightWrites.put(new WriteKey(item),write);
}
- inflightWrites.put(new WriteKey(item), write);
}
private void enqueue(Object command) throws IOException {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java Mon Nov 20 05:11:26 2006
@@ -97,7 +97,7 @@
return name;
}
- DataFile findSpaceForData(DataItem item) throws IOException{
+ synchronized DataFile findSpaceForData(DataItem item) throws IOException{
if(currentWriteFile==null||((currentWriteFile.getLength()+item.getSize())>maxFileLength)){
int nextNum=currentWriteFile!=null?currentWriteFile.getNumber().intValue()+1:1;
if(currentWriteFile!=null&¤tWriteFile.isUnused()){
@@ -125,15 +125,15 @@
return getReader().readItem(marshaller,item);
}
- public StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException{
+ public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException{
return getWriter().storeItem(marshaller,payload, DATA_ITEM_TYPE);
}
- public StoreLocation storeRedoItem(Object payload) throws IOException{
+ public synchronized StoreLocation storeRedoItem(Object payload) throws IOException{
return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
}
- public void updateItem(StoreLocation location,Marshaller marshaller, Object payload) throws IOException {
+ public synchronized void updateItem(StoreLocation location,Marshaller marshaller, Object payload) throws IOException {
getWriter().updateItem((DataItem)location,marshaller,payload,DATA_ITEM_TYPE);
}
@@ -296,7 +296,7 @@
}
return reader;
}
- protected DataFileReader createReader() {
+ protected synchronized DataFileReader createReader() {
if( useAsyncWriter ) {
return new AsyncDataFileReader(this, (AsyncDataFileWriter) getWriter());
} else {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java Mon Nov 20 05:11:26 2006
@@ -45,7 +45,7 @@
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem)
*/
- public byte readDataItemSize(DataItem item) throws IOException {
+ public synchronized byte readDataItemSize(DataItem item) throws IOException {
RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
file.seek(item.getOffset()); // jump to the size field
byte rc = file.readByte();
@@ -56,7 +56,7 @@
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller, org.apache.activemq.kaha.StoreLocation)
*/
- public Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
+ public synchronized Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
RandomAccessFile file=dataManager.getDataFile(item).getRandomAccessFile();
// TODO: we could reuse the buffer in dataIn if it's big enough to avoid
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java Mon Nov 20 05:11:26 2006
@@ -24,7 +24,7 @@
*
* @version $Revision$
*/
-public class DiskIndexLinkedList implements IndexLinkedList{
+public class DiskIndexLinkedList implements IndexLinkedList{
protected IndexManager indexManager;
protected transient IndexItem root;
protected transient IndexItem last;
@@ -33,12 +33,12 @@
/**
* Constructs an empty list.
*/
- public DiskIndexLinkedList(IndexManager im,IndexItem header){
+ public DiskIndexLinkedList(IndexManager im,IndexItem header){
this.indexManager=im;
this.root=header;
}
- public IndexItem getRoot(){
+ public synchronized IndexItem getRoot(){
return root;
}
@@ -51,7 +51,7 @@
*
* @return the first element in this list.
*/
- public IndexItem getFirst(){
+ public synchronized IndexItem getFirst(){
if(size==0)
return null;
return getNextEntry(root);
@@ -62,7 +62,7 @@
*
* @return the last element in this list.
*/
- public IndexItem getLast(){
+ public synchronized IndexItem getLast(){
if(size==0)
return null;
return last;
@@ -73,7 +73,7 @@
*
* @return the first element from this list.
*/
- public StoreEntry removeFirst(){
+ public synchronized StoreEntry removeFirst(){
if(size==0){
return null;
}
@@ -87,7 +87,7 @@
*
* @return the last element from this list.
*/
- public Object removeLast(){
+ public synchronized Object removeLast(){
if(size==0)
return null;
StoreEntry result=last;
@@ -100,7 +100,7 @@
*
* @param o the element to be inserted at the beginning of this list.
*/
- public void addFirst(IndexItem item){
+ public synchronized void addFirst(IndexItem item){
if(size==0){
last=item;
}
@@ -113,7 +113,7 @@
*
* @param o the element to be inserted at the end of this list.
*/
- public void addLast(IndexItem item){
+ public synchronized void addLast(IndexItem item){
size++;
last=item;
}
@@ -123,7 +123,7 @@
*
* @return the number of elements in this list.
*/
- public int size(){
+ public synchronized int size(){
return size;
}
@@ -132,7 +132,7 @@
*
* @return true if there are no elements in the list
*/
- public boolean isEmpty(){
+ public synchronized boolean isEmpty(){
return size==0;
}
@@ -142,7 +142,7 @@
* @param o element to be appended to this list.
* @return <tt>true</tt> (as per the general contract of <tt>Collection.add</tt>).
*/
- public boolean add(IndexItem item){
+ public synchronized boolean add(IndexItem item){
addLast(item);
return true;
}
@@ -150,7 +150,7 @@
/**
* Removes all of the elements from this list.
*/
- public void clear(){
+ public synchronized void clear(){
last=null;
size=0;
}
@@ -164,7 +164,7 @@
*
* @throws IndexOutOfBoundsException if the specified index is is out of range (<tt>index < 0 || index >= size()</tt>).
*/
- public IndexItem get(int index){
+ public synchronized IndexItem get(int index){
return entry(index);
}
@@ -177,7 +177,7 @@
*
* @throws IndexOutOfBoundsException if the specified index is out of range (<tt>index < 0 || index > size()</tt>).
*/
- public void add(int index,IndexItem element){
+ public synchronized void add(int index,IndexItem element){
if(index==size-1){
last=element;
}
@@ -193,7 +193,7 @@
*
* @throws IndexOutOfBoundsException if the specified index is out of range (<tt>index < 0 || index >= size()</tt>).
*/
- public Object remove(int index){
+ public synchronized Object remove(int index){
IndexItem e=entry(index);
remove(e);
return e;
@@ -225,7 +225,7 @@
* @return the index in this list of the first occurrence of the specified element, or -1 if the list does not
* contain this element.
*/
- public int indexOf(StoreEntry o){
+ public synchronized int indexOf(StoreEntry o){
int index=0;
if(size>0){
for(IndexItem e=getNextEntry(root);e!=null;e=getNextEntry(e)){
@@ -244,7 +244,7 @@
* @param entry
* @return next entry
*/
- public IndexItem getNextEntry(IndexItem current){
+ public synchronized IndexItem getNextEntry(IndexItem current){
IndexItem result=null;
if(current!=null&¤t.getNextItem()>=0){
try{
@@ -266,7 +266,7 @@
* @param entry
* @return prev entry
*/
- public IndexItem getPrevEntry(IndexItem current){
+ public synchronized IndexItem getPrevEntry(IndexItem current){
IndexItem result=null;
if(current!=null&¤t.getPreviousItem()>=0){
try{
@@ -282,7 +282,7 @@
return result;
}
- public StoreEntry getEntry(StoreEntry current){
+ public synchronized StoreEntry getEntry(StoreEntry current){
StoreEntry result=null;
if(current != null && current.getOffset() >= 0){
try{
@@ -302,7 +302,7 @@
* Update the indexes of a StoreEntry
* @param current
*/
- public StoreEntry refreshEntry(StoreEntry current){
+ public synchronized StoreEntry refreshEntry(StoreEntry current){
StoreEntry result=null;
if(current != null && current.getOffset() >= 0){
try{
@@ -318,7 +318,7 @@
return result;
}
- public void remove(IndexItem e){
+ public synchronized void remove(IndexItem e){
if(e==root||e.equals(root))
return;
if(e==last||e.equals(last)){
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java Mon Nov 20 05:11:26 2006
@@ -81,7 +81,11 @@
}
public synchronized void updateIndexes(IndexItem index) throws IOException{
+ try {
writer.updateIndexes(index);
+ }catch(Throwable e) {
+ log.error(name + " GORT ERROR! ",e);
+ }
}
public synchronized void redo(final RedoStoreIndexItem redo) throws IOException{
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java Mon Nov 20 05:11:26 2006
@@ -30,6 +30,7 @@
import org.apache.activemq.store.jdbc.Statements;
import org.apache.activemq.store.journal.JournalPersistenceAdapter;
import org.apache.activemq.store.journal.QuickJournalPersistenceAdapter;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -67,7 +68,9 @@
if( useQuickJournal ) {
return new QuickJournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
} else {
+ KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("amqstore"));
return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
+ //return new JournalPersistenceAdapter(getJournal(), adaptor, getTaskRunnerFactory());
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Mon Nov 20 05:11:26 2006
@@ -84,12 +84,6 @@
public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,
String subscriptionName) throws SQLException,IOException;
- public void doGetPrevDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
- String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception;
-
- public void doGetNextDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
- String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception;
-
public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned,
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Mon Nov 20 05:11:26 2006
@@ -65,7 +65,6 @@
private String nextDurableSubscriberMessageStatement;
private String durableSubscriberMessageCountStatement;
private String nextDurableSubscriberMessageIdStatement;
- private String prevDurableSubscriberMessageIdStatement;
private String destinationMessageCountStatement;
private String findNextMessagesStatement;
private boolean useLockCreateWhereClause;
@@ -253,46 +252,7 @@
return durableSubscriberMessageCountStatement;
}
- /**
- * @return the nextDurableSubscriberMessageIdStatement
- */
- public String getNextDurableSubscriberMessageIdStatement(){
- if (nextDurableSubscriberMessageIdStatement==null) {
- nextDurableSubscriberMessageIdStatement =
- "SELECT M.ID FROM " + getFullMessageTableName() + " M, "
- + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
- + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID ";
- }
- return nextDurableSubscriberMessageIdStatement;
- }
-
- /**
- * @return the prevDurableSubscriberMessageIdStatement
- */
- /*
- public String getPrevDurableSubscriberMessageIdStatement(){
- if(prevDurableSubscriberMessageIdStatement==null) {
- prevDurableSubscriberMessageIdStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
- + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
- + " AND M.CONTAINER=D.CONTAINER AND M.ID < ?" + " ORDER BY M.ID ";
- }
- return prevDurableSubscriberMessageIdStatement;
- }
- */
-
-
- public String getPrevDurableSubscriberMessageIdStatement(){
- if(prevDurableSubscriberMessageIdStatement==null) {
- prevDurableSubscriberMessageIdStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M "
- + " WHERE M.CONTAINER=? "
- + " AND M.ID <?" + " ORDER BY M.ID DESC ";
- }
- return prevDurableSubscriberMessageIdStatement;
- }
-
-
-
- public String getFindAllDestinationsStatement() {
+ public String getFindAllDestinationsStatement() {
if (findAllDestinationsStatement == null) {
findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName();
}
@@ -643,15 +603,7 @@
this.nextDurableSubscriberMessageIdStatement=nextDurableSubscriberMessageIdStatement;
}
-
-
-
- /**
- * @param prevDurableSubscriberMessageIdStatement the prevDurableSubscriberMessageIdStatement to set
- */
- public void setPrevDurableSubscriberMessageIdStatement(String prevDurableSubscriberMessageIdStatement){
- this.prevDurableSubscriberMessageIdStatement=prevDurableSubscriberMessageIdStatement;
- }
+
/**
* @param findNextMessagesStatement the findNextMessagesStatement to set
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Mon Nov 20 05:11:26 2006
@@ -613,71 +613,7 @@
close(rs);
close(s);
}
- }
-
- /**
- * @param c
- * @param destination
- * @param clientId
- * @param subscriberName
- * @param id
- * @return the previous Id
- * @throws Exception
- * @see org.apache.activemq.store.jdbc.JDBCAdapter#doGetPrevDurableSubscriberMessageStatement(org.apache.activemq.store.jdbc.TransactionContext,
- * org.apache.activemq.command.ActiveMQDestination, java.lang.String, java.lang.String, java.lang.String)
- */
- public void doGetPrevDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
- String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception{
- PreparedStatement s=null;
- ResultSet rs=null;
- try{
- s=c.getConnection().prepareStatement(statements.getPrevDurableSubscriberMessageIdStatement());
- s.setString(1,destination.getQualifiedName());
- s.setLong(2,id);
- rs=s.executeQuery();
- if (rs.next()) {
- listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
- }
- listener.finished();
-
- }finally{
- close(rs);
- close(s);
- }
- }
-
- /**
- * @param c
- * @param destination
- * @param clientId
- * @param subscriberName
- * @param id
- * @return the next id
- * @throws SQLException
- * @throws IOException
- * @see org.apache.activemq.store.jdbc.JDBCAdapter#doGetNextDurableSubscriberMessageIdStatement(org.apache.activemq.store.jdbc.TransactionContext,
- * org.apache.activemq.command.ActiveMQDestination, java.lang.String, java.lang.String, java.lang.String)
- */
- public void doGetNextDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
- String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception{
- PreparedStatement s=null;
- ResultSet rs=null;
- try{
- s=c.getConnection().prepareStatement(statements.getNextDurableSubscriberMessageIdStatement());
- s.setString(1,destination.getQualifiedName());
- s.setLong(2,id);
- rs=s.executeQuery();
- if (rs.next()) {
- listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
- }
- listener.finished();
-
- }finally{
- close(rs);
- close(s);
- }
- }
-
+ }
public int doGetMessageCount(TransactionContext c,ActiveMQDestination destination) throws SQLException, IOException{
PreparedStatement s=null;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Mon Nov 20 05:11:26 2006
@@ -65,14 +65,14 @@
}
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
- StoreEntry item = messageContainer.placeLast(message);
- // TODO: we should do the following but it is not need if the message is being added within a persistence transaction
- // but since I can't tell if one is running right now.. I'll leave this out for now.
-// if( message.isResponseRequired() ) {
-// messageContainer.force();
-// }
+ StoreEntry item=messageContainer.placeLast(message);
+ // TODO: we should do the following but it is not need if the message is being added within a persistence
+ // transaction
+ // but since I can't tell if one is running right now.. I'll leave this out for now.
+ // if( message.isResponseRequired() ) {
+ // messageContainer.force();
+ // }
cache.put(message.getMessageId(),item);
-
}
public synchronized void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
@@ -84,6 +84,7 @@
Message result=null;
StoreEntry entry=(StoreEntry)cache.get(identity);
if(entry!=null){
+ entry = messageContainer.refresh(entry);
result = (Message)messageContainer.get(entry);
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
@@ -109,6 +110,7 @@
public synchronized void removeMessage(MessageId msgId) throws IOException{
StoreEntry entry=(StoreEntry)cache.remove(msgId);
if(entry!=null){
+ entry = messageContainer.refresh(entry);
messageContainer.remove(entry);
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Mon Nov 20 05:11:26 2006
@@ -74,7 +74,7 @@
Set rc=new HashSet();
try{
Store store=getStore();
- for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
+ for(Iterator i=store.getListContainerIds().iterator();i.hasNext();){
Object obj=i.next();
if(obj instanceof ActiveMQDestination){
rc.add(obj);
@@ -161,7 +161,11 @@
public void deleteAllMessages() throws IOException{
if(theStore!=null){
- theStore.delete();
+ if(theStore.isInitialized()){
+ theStore.clear();
+ }else{
+ theStore.delete();
+ }
}
}
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java Mon Nov 20 05:11:26 2006
@@ -29,7 +29,7 @@
*/
public class LoadTest extends TestCase{
static final int COUNT=10000;
- static final int NUM_LOADERS=2;
+ static final int NUM_LOADERS=5;
protected String name="load.db";
protected KahaStore store;
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java?view=diff&rev=477166&r1=477165&r2=477166
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java Mon Nov 20 05:11:26 2006
@@ -91,6 +91,7 @@
startLoad=System.currentTimeMillis();
value = getData(2048);
for(int i=0;i<count;i++){
+ //System.out.println(this + " Container size = " + container.size());
String key="key:"+i;
container.put(key,value);
}
|