From commits-return-7817-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Wed Dec 12 22:54:26 2007 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 85525 invoked from network); 12 Dec 2007 22:54:23 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 12 Dec 2007 22:54:23 -0000 Received: (qmail 84743 invoked by uid 500); 12 Dec 2007 22:54:12 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 84725 invoked by uid 500); 12 Dec 2007 22:54:12 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 84715 invoked by uid 99); 12 Dec 2007 22:54:11 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Dec 2007 14:54:11 -0800 X-ASF-Spam-Status: No, hits=-98.0 required=10.0 tests=ALL_TRUSTED,URIBL_BLACK X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Dec 2007 22:53:58 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A3E3E1A9832; Wed, 12 Dec 2007 14:54:01 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r603762 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/kaha/impl/ main/java/org/apache/activemq/store/amq/ main/java/org/apache/activemq/store/kahadaptor/ main/java/org/apache/activ... Date: Wed, 12 Dec 2007 22:54:00 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071212225401.A3E3E1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Wed Dec 12 14:53:59 2007 New Revision: 603762 URL: http://svn.apache.org/viewvc?rev=603762&view=rev Log: Fix for https://issues.apache.org/activemq/browse/AMQ-1507 Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?rev=603762&r1=603761&r2=603762&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java Wed Dec 12 14:53:59 2007 @@ -272,8 +272,22 @@ */ long size(); + /** + * @return true if persistent indexes are used by default + */ public boolean isPersistentIndex(); + /** + * Set a persistent index as the default if the parameter is true + * @param persistentIndex + */ public void setPersistentIndex(boolean persistentIndex); + + /** + * An explict call to initialize - this will also be called + * implicitly for any other operation on the store. + * @throws IOException + */ + public void initialize() throws IOException; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?rev=603762&r1=603761&r2=603762&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Wed Dec 12 14:53:59 2007 @@ -445,10 +445,10 @@ } if (!initialized) { - LOG.info("Kaha Store using data directory " + directory); + lockFile = new RandomAccessFile(new File(directory, "lock"), "rw"); lock(); - + LOG.info("Kaha Store using data directory " + directory); DataManager defaultDM = getDataManager(DEFAULT_CONTAINER_NAME); rootIndexManager = getIndexManager(defaultDM, DEFAULT_CONTAINER_NAME); IndexItem mapRoot = new IndexItem(); @@ -486,6 +486,7 @@ if (!BROKEN_FILE_LOCK) { lock = lockFile.getChannel().tryLock(); if (lock == null) { + initialized=false; throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by another application"); } else { @@ -493,6 +494,7 @@ } } } else { // already locked + initialized=false; throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by this application."); } @@ -501,7 +503,7 @@ private synchronized void unlock() throws IOException { if (!DISABLE_LOCKING && (null != directory) && (null != lock)) { - System.getProperties().remove(getPropertyKey()); + System.clearProperty(getPropertyKey()); if (lock.isValid()) { lock.release(); } @@ -510,7 +512,6 @@ } private String getPropertyKey() throws IOException { - // Is replaceAll() needed? Should test without it. return getClass().getName() + ".lock." + directory.getCanonicalPath(); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=603762&r1=603761&r2=603762&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Wed Dec 12 14:53:59 2007 @@ -18,6 +18,8 @@ import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileLock; import java.util.Date; import java.util.HashSet; import java.util.Iterator; @@ -83,6 +85,10 @@ private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class); private final ConcurrentHashMap queues = new ConcurrentHashMap(); private final ConcurrentHashMap topics = new ConcurrentHashMap(); + private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq"; + private static final boolean BROKEN_FILE_LOCK; + private static final boolean DISABLE_LOCKING; + private AsyncDataManager asyncDataManager; private ReferenceStoreAdapter referenceStoreAdapter; private TaskRunnerFactory taskRunnerFactory; @@ -112,7 +118,10 @@ private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE; private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE; private Map> dataFilesInProgress = new ConcurrentHashMap> (); - + private String directoryPath = ""; + private RandomAccessFile lockFile; + private FileLock lock; + private boolean disableLocking = DISABLE_LOCKING; public String getBrokerName() { return this.brokerName; @@ -141,13 +150,17 @@ if (brokerService != null) { this.directory = brokerService.getBrokerDataDirectory(); } else { + this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName)); this.directory = new File(directory, "amqstore"); + this.directoryPath=directory.getAbsolutePath(); } } if (this.directoryArchive == null) { this.directoryArchive = new File(this.directory,"archive"); } + lockFile = new RandomAccessFile(new File(directory, "lock"), "rw"); + lock(); LOG.info("AMQStore starting using directory: " + directory); this.directory.mkdirs(); if (archiveDataLogs) { @@ -240,6 +253,11 @@ if (!started.compareAndSet(true, false)) { return; } + if (lockFile != null) { + lockFile.close(); + lockFile = null; + } + unlock(); this.usageManager.getMemoryUsage().removeUsageListener(this); synchronized (this) { Scheduler.cancel(periodicCheckpointTask); @@ -818,7 +836,15 @@ public void setArchiveDataLogs(boolean archiveDataLogs) { this.archiveDataLogs = archiveDataLogs; - } + } + + public boolean isDisableLocking() { + return disableLocking; + } + + public void setDisableLocking(boolean disableLocking) { + this.disableLocking = disableLocking; + } protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) { @@ -836,4 +862,72 @@ set.remove(dataFileId); } } + + + + protected void lock() throws IOException, InterruptedException { + boolean logged = false; + boolean aquiredLock = false; + do { + if (doLock()) { + aquiredLock = true; + } else { + if (!logged) { + LOG.warn("Waiting to Lock the Store " + getDirectory()); + logged = true; + } + Thread.sleep(1000); + } + + if (aquiredLock && logged) { + LOG.info("Aquired lock for AMQ Store" + getDirectory()); + } + + } while (!aquiredLock && !disableLocking); + } + + private synchronized void unlock() throws IOException { + if (!disableLocking && (null != directory) && (null != lock)) { + System.clearProperty(getPropertyKey()); + if (lock.isValid()) { + lock.release(); + } + lock = null; + } + } + + + protected boolean doLock() throws IOException { + boolean result = true; + if (!disableLocking && directory != null && lock == null) { + String key = getPropertyKey(); + String property = System.getProperty(key); + if (null == property) { + if (!BROKEN_FILE_LOCK) { + lock = lockFile.getChannel().tryLock(); + if (lock == null) { + result = false; + } else { + System.setProperty(key, new Date().toString()); + } + } + } else { // already locked + result = false; + } + } + return result; + } + + private String getPropertyKey() throws IOException { + return getClass().getName() + ".lock." + directory.getCanonicalPath(); + } + + static { + BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX + + ".FileLockBroken", + "false")); + DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX + + ".DisableLocking", + "false")); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java?rev=603762&r1=603761&r2=603762&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java Wed Dec 12 14:53:59 2007 @@ -56,6 +56,7 @@ return recoverMessage(message); } else { LOG.error("Message id " + ref + " could not be recovered from the data store!"); + Thread.dumpStack(); } return false; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=603762&r1=603761&r2=603762&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Wed Dec 12 14:53:59 2007 @@ -39,6 +39,7 @@ import org.apache.activemq.kaha.MessageIdMarshaller; import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.StoreFactory; +import org.apache.activemq.kaha.impl.StoreLockedExcpetion; import org.apache.activemq.kaha.impl.index.hash.HashIndex; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.ReferenceStore; @@ -53,7 +54,7 @@ - private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class); + private static final Log LOG = LogFactory.getLog(KahaReferenceStoreAdapter.class); private static final String STORE_STATE = "store-state"; private static final String INDEX_VERSION_NAME = "INDEX_VERSION"; private static final Integer INDEX_VERSION = new Integer(3); @@ -87,7 +88,7 @@ @Override public synchronized void start() throws Exception { super.start(); - Store store = getStateStore(); + Store store = getStateStore(); boolean empty = store.getMapContainerIds().isEmpty(); stateMap = store.getMapContainer("state", STORE_STATE); stateMap.load(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java?rev=603762&r1=603761&r2=603762&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java Wed Dec 12 14:53:59 2007 @@ -43,7 +43,7 @@ maxNumberOfArrays = Math.max(maxNumberOfArrays, 1); list = new LinkedList(); for (int i = 0; i < maxNumberOfArrays; i++) { - list.add(new BitArray()); + list.add(null); } } @@ -130,6 +130,10 @@ bin = list.size() - 1; } answer = list.get(bin); + if (answer == null) { + answer = new BitArray(); + list.set(bin, answer); + } } return answer; } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java?rev=603762&r1=603761&r2=603762&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java Wed Dec 12 14:53:59 2007 @@ -38,20 +38,15 @@ protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false"; protected void setUp() throws Exception { + messageCount = 10000; if (System.getProperty("basedir") == null) { File file = new File("."); System.setProperty("basedir", file.getAbsolutePath()); } failureCount = super.messageCount / 2; super.topic = isTopic(); - BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getMasterXml())); - brokerFactory.afterPropertiesSet(); - master = brokerFactory.getBroker(); - brokerFactory = new BrokerFactoryBean(new ClassPathResource(getSlaveXml())); - brokerFactory.afterPropertiesSet(); - slave = brokerFactory.getBroker(); - master.start(); - slave.start(); + createMaster(); + createSlave(); // wait for thing to connect Thread.sleep(1000); super.setUp(); @@ -87,5 +82,19 @@ protected boolean isTopic() { return false; + } + + protected void createMaster() throws Exception { + BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getMasterXml())); + brokerFactory.afterPropertiesSet(); + master = brokerFactory.getBroker(); + master.start(); + } + + protected void createSlave() throws Exception { + BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getSlaveXml())); + brokerFactory.afterPropertiesSet(); + slave = brokerFactory.getBroker(); + slave.start(); } } Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java?rev=603762&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java Wed Dec 12 14:53:59 2007 @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.ft; + +import org.apache.activemq.xbean.BrokerFactoryBean; +import org.springframework.core.io.ClassPathResource; + + +public class QueueMasterSlaveTestUsingSharedFileTest extends + QueueMasterSlaveTest { + + protected String getSlaveXml() { + return "org/apache/activemq/broker/ft/sharedFileSlave.xml"; + } + + protected String getMasterXml() { + return "org/apache/activemq/broker/ft/sharedFileMaster.xml"; + } + + protected void createSlave() throws Exception { + new Thread(new Runnable() { + + public void run() { + try { + QueueMasterSlaveTestUsingSharedFileTest.super.createSlave(); + } catch (Exception e) { + + e.printStackTrace(); + } + + } + + }).start(); + } + +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java ------------------------------------------------------------------------------ svn:eol-style = native