activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject activemq git commit: Implements https://issues.apache.org/jira/browse/AMQ-5458
Date Tue, 25 Nov 2014 16:45:26 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 74f530a64 -> ebafd5c19


Implements https://issues.apache.org/jira/browse/AMQ-5458 


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ebafd5c1
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ebafd5c1
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ebafd5c1

Branch: refs/heads/trunk
Commit: ebafd5c19388ce183299c2c41c966c5580e87821
Parents: 74f530a
Author: Hiram Chirino <hiram@hiramchirino.com>
Authored: Tue Nov 25 11:40:48 2014 -0500
Committer: Hiram Chirino <hiram@hiramchirino.com>
Committed: Tue Nov 25 11:41:00 2014 -0500

----------------------------------------------------------------------
 .../activemq/leveldb/LevelDBStoreTestMBean.java |  56 ++++++++++
 .../apache/activemq/leveldb/LevelDBStore.scala  |  75 +++++++++++++
 .../org/apache/activemq/leveldb/RecordLog.scala | 106 ++++++++++++++++++-
 3 files changed, 232 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ebafd5c1/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreTestMBean.java
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreTestMBean.java
b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreTestMBean.java
new file mode 100644
index 0000000..63338a8
--- /dev/null
+++ b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreTestMBean.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb;
+
+import org.apache.activemq.broker.jmx.MBeanInfo;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface LevelDBStoreTestMBean {
+
+    @MBeanInfo("Used to set if the log force calls should be suspended")
+    void setSuspendForce(boolean value);
+
+    @MBeanInfo("Gets if the log force calls should be suspended")
+    boolean getSuspendForce();
+
+    @MBeanInfo("Gets the number of threads waiting to do a log force call.")
+    long getForceCalls();
+
+    @MBeanInfo("Used to set if the log write calls should be suspended")
+    void setSuspendWrite(boolean value);
+
+    @MBeanInfo("Gets if the log write calls should be suspended")
+    boolean getSuspendWrite();
+
+    @MBeanInfo("Gets the number of threads waiting to do a log write call.")
+    long getWriteCalls();
+
+    @MBeanInfo("Used to set if the log delete calls should be suspended")
+    void setSuspendDelete(boolean value);
+
+    @MBeanInfo("Gets if the log delete calls should be suspended")
+    boolean getSuspendDelete();
+
+    @MBeanInfo("Gets the number of threads waiting to do a log delete call.")
+    long getDeleteCalls();
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebafd5c1/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index f86e05b..b10cd3e 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -37,6 +37,7 @@ import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream}
 import org.fusesource.hawtdispatch;
 import org.apache.activemq.broker.scheduler.JobSchedulerStore
 import org.apache.activemq.store.IndexListener.MessageContext
+import javax.management.ObjectName
 
 object LevelDBStore extends Log {
   val DEFAULT_DIRECTORY = new File("LevelDB");
@@ -82,6 +83,74 @@ case class DurableSubscription(subKey:Long, topicKey:Long, info: SubscriptionInf
   var cursorPosition = 0L
 }
 
+class LevelDBStoreTest(val store:LevelDBStore) extends LevelDBStoreTestMBean {
+
+  import store._
+  var suspendForce = false;
+  
+  override def setSuspendForce(value: Boolean): Unit = this.synchronized {
+    if( suspendForce!=value ) {
+      suspendForce = value;
+      if( suspendForce ) {
+        db.client.log.recordLogTestSupport.forceCall.suspend
+      } else {
+        db.client.log.recordLogTestSupport.forceCall.resume
+      }
+    }
+  }
+
+  override def getSuspendForce: Boolean = this.synchronized {
+    suspendForce
+  }
+
+  override def getForceCalls = this.synchronized {
+    db.client.log.recordLogTestSupport.forceCall.threads.get()
+  }
+
+  var suspendWrite = false;
+
+  override def setSuspendWrite(value: Boolean): Unit = this.synchronized {
+    if( suspendWrite!=value ) {
+      suspendWrite = value;
+      if( suspendWrite ) {
+        db.client.log.recordLogTestSupport.writeCall.suspend
+      } else {
+        db.client.log.recordLogTestSupport.writeCall.resume
+      }
+    }
+  }
+
+  override def getSuspendWrite: Boolean = this.synchronized {
+    suspendWrite
+  }
+
+  override def getWriteCalls = this.synchronized {
+    db.client.log.recordLogTestSupport.writeCall.threads.get()
+  }
+
+  var suspendDelete = false;
+  
+  override def setSuspendDelete(value: Boolean): Unit = this.synchronized {
+    if( suspendDelete!=value ) {
+      suspendDelete = value;
+      if( suspendDelete ) {
+        db.client.log.recordLogTestSupport.deleteCall.suspend
+      } else {
+        db.client.log.recordLogTestSupport.deleteCall.resume
+      }
+    }
+  }
+
+  override def getSuspendDelete: Boolean = this.synchronized {
+    suspendDelete
+  }
+
+  override def getDeleteCalls = this.synchronized {
+    db.client.log.recordLogTestSupport.deleteCall.threads.get()
+  }  
+  
+}
+
 class LevelDBStoreView(val store:LevelDBStore) extends LevelDBStoreViewMBean {
   import store._
 
@@ -223,6 +292,10 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
     if(brokerService!=null && brokerService.isUseJmx){
       try {
         AnnotatedMBean.registerMBean(brokerService.getManagementContext, new LevelDBStoreView(this),
objectName)
+        if( java.lang.Boolean.getBoolean("org.apache.activemq.leveldb.test") ) {
+          val name = new ObjectName(objectName.toString + ",test=test")
+          AnnotatedMBean.registerMBean(brokerService.getManagementContext, new LevelDBStoreTest(this),
name)
+        }
       } catch {
         case e: Throwable => {
           warn(e, "LevelDB Store could not be registered in JMX: " + e.getMessage)
@@ -279,6 +352,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
     db.stop
     if(brokerService!=null && brokerService.isUseJmx){
       brokerService.getManagementContext().unregisterMBean(objectName);
+      if( java.lang.Boolean.getBoolean("org.apache.activemq.leveldb.test") )
+        brokerService.getManagementContext().unregisterMBean(new ObjectName(objectName.toString+",test=test"));
     }
     info("Stopped "+this)
   }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebafd5c1/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
index daef103..1ab66ce 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala
@@ -22,7 +22,7 @@ import java.{util=>ju}
 
 import java.util.zip.CRC32
 import java.util.Map.Entry
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
 import java.io._
 import org.fusesource.hawtbuf.{DataByteArrayInputStream, DataByteArrayOutputStream, Buffer}
 import org.fusesource.hawtdispatch.BaseRetained
@@ -31,6 +31,8 @@ import org.apache.activemq.util.LRUCache
 import util.TimeMetric._
 import util.{TimeMetric, Log}
 import java.util.TreeMap
+import java.util.concurrent.locks.{ReentrantReadWriteLock, ReadWriteLock}
+import java.util.concurrent.CountDownLatch
 
 object RecordLog extends Log {
 
@@ -68,6 +70,63 @@ object RecordLog extends Log {
 
 }
 
+class SuspendCallSupport {
+
+  val lock = new ReentrantReadWriteLock()
+  var resumeLatch:CountDownLatch = _
+  var resumedLatch:CountDownLatch = _
+  @volatile
+  var threads = new AtomicInteger()
+
+  def suspend = this.synchronized {
+    val suspended = new CountDownLatch(1)
+    resumeLatch = new CountDownLatch(1)
+    resumedLatch = new CountDownLatch(1)
+    new Thread("Suspend Lock") {
+      override def run = {
+        try {
+          lock.writeLock().lock()
+          suspended.countDown()
+          resumeLatch.await()
+        } finally {
+          lock.writeLock().unlock();
+          resumedLatch.countDown()
+        }
+      }
+    }.start()
+    suspended.await()
+  }
+
+  def resume = this.synchronized {
+    if( resumedLatch != null ) {
+      resumeLatch.countDown()
+      resumedLatch.await();
+      resumeLatch = null
+      resumedLatch = null
+    }
+  }
+
+  def call[T](func: =>T):T= {
+    threads.incrementAndGet()
+    lock.readLock().lock()
+    try {
+      func
+    } finally {
+      threads.decrementAndGet()
+      lock.readLock().unlock()
+    }
+  }
+
+}
+
+class RecordLogTestSupport {
+
+  val forceCall = new SuspendCallSupport()
+  val writeCall = new SuspendCallSupport()
+  val deleteCall = new SuspendCallSupport()
+
+}
+
 case class RecordLog(directory: File, logSuffix:String) {
   import RecordLog._
 
@@ -78,6 +137,14 @@ case class RecordLog(directory: File, logSuffix:String) {
   var verify_checksums = false
   val log_infos = new TreeMap[Long, LogInfo]()
 
+  var recordLogTestSupport:RecordLogTestSupport =
+    if( java.lang.Boolean.getBoolean("org.apache.activemq.leveldb.test") ) {
+      new RecordLogTestSupport()
+    } else {
+      null
+    }
+
+
   object log_mutex
 
   def delete(id:Long) = {
@@ -97,7 +164,13 @@ case class RecordLog(directory: File, logSuffix:String) {
   }
 
   protected def onDelete(file:File) = {
-    file.delete()
+    if( recordLogTestSupport!=null ) {
+      recordLogTestSupport.deleteCall.call {
+        file.delete()
+      }
+    } else {
+      file.delete()
+    }
   }
 
   def checksum(data: Buffer): Int = {
@@ -137,10 +210,17 @@ case class RecordLog(directory: File, logSuffix:String) {
       flush
       max_log_flush_latency {
         // only need to update the file metadata if the file size changes..
-        channel.force(append_offset > logSize)
+        if( recordLogTestSupport!=null ) {
+          recordLogTestSupport.forceCall.call {
+            channel.force(append_offset > logSize)
+          }
+        } else {
+          channel.force(append_offset > logSize)
+        }
       }
     }
 
+
     def skip(length:Long) = this.synchronized {
       flush
       append_offset += length
@@ -177,7 +257,15 @@ case class RecordLog(directory: File, logSuffix:String) {
         val buffer = data.toByteBuffer
         val pos = append_offset+LOG_HEADER_SIZE
         val remaining = buffer.remaining
-        channel.write(buffer, pos)
+
+        if( recordLogTestSupport!=null ) {
+          recordLogTestSupport.writeCall.call {
+            channel.write(buffer, pos)
+          }
+        } else {
+          channel.write(buffer, pos)
+        }
+
         flushed_offset.addAndGet(remaining)
         if( buffer.hasRemaining ) {
           throw new IOException("Short write")
@@ -200,7 +288,15 @@ case class RecordLog(directory: File, logSuffix:String) {
         val buffer = write_buffer.toBuffer.toByteBuffer
         val remaining = buffer.remaining
         val pos = append_offset-remaining
-        channel.write(buffer, pos)
+
+        if( recordLogTestSupport!=null ) {
+          recordLogTestSupport.writeCall.call {
+            channel.write(buffer, pos)
+          }
+        } else {
+          channel.write(buffer, pos)
+        }
+
         flushed_offset.addAndGet(remaining)
         if( buffer.hasRemaining ) {
           throw new IOException("Short write")


Mime
View raw message