activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1497843 - in /activemq/trunk/activemq-leveldb-store/src/main: java/org/apache/activemq/leveldb/replicated/ scala/org/apache/activemq/leveldb/replicated/
Date Fri, 28 Jun 2013 16:57:29 GMT
Author: chirino
Date: Fri Jun 28 16:57:29 2013
New Revision: 1497843

URL: http://svn.apache.org/r1497843
Log:
Expose the replicated store status via JMX.

Added:
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java
Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala

Added: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java?rev=1497843&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java
(added)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java
Fri Jun 28 16:57:29 2013
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.leveldb.replicated;
+
+import org.apache.activemq.broker.jmx.MBeanInfo;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface ReplicatedLevelDBStoreViewMBean {
+
+    @MBeanInfo("The address of the ZooKeeper server.")
+    String getZkAddress();
+    @MBeanInfo("The path in ZooKeeper to hold master elections.")
+    String getZkPath();
+    @MBeanInfo("The ZooKeeper session timeout.")
+    String getZkSessionTmeout();
+    @MBeanInfo("The address and port the master will bind for the replication protocol.")
+    String getBind();
+    @MBeanInfo("The number of replication nodes that will be part of the replication cluster.")
+    int getReplicas();
+
+    @MBeanInfo("The role of this node in the replication cluster.")
+    String getNodeRole();
+
+    @MBeanInfo("The replication status.")
+    String getStatus();
+
+    @MBeanInfo("The current position of the replication log.")
+    Long getPosition();
+
+    @MBeanInfo("The directory holding the data.")
+    String getDirectory();
+
+    @MBeanInfo("The size the log files are allowed to grow to.")
+    long getLogSize();
+
+    @MBeanInfo("The implementation of the LevelDB index being used.")
+    String getIndexFactory();
+
+    @MBeanInfo("Is data verified against checksums as it's loaded back from disk.")
+    boolean getVerifyChecksums();
+
+    @MBeanInfo("The maximum number of open files the index will open at one time.")
+    int getIndexMaxOpenFiles();
+
+    @MBeanInfo("Number of keys between restart points for delta encoding of keys in the index")
+    int getIndexBlockRestartInterval();
+
+    @MBeanInfo("Do aggressive checking of store data")
+    boolean getParanoidChecks();
+
+    @MBeanInfo("Amount of data to build up in memory for the index before converting to a
sorted on-disk file.")
+    int getIndexWriteBufferSize();
+
+    @MBeanInfo("Approximate size of user data packed per block for the index")
+    int getIndexBlockSize();
+
+    @MBeanInfo("The type of compression to use for the index")
+    String getIndexCompression();
+
+    @MBeanInfo("The size of the cache index")
+    long getIndexCacheSize();
+
+    @MBeanInfo("The maximum amount of async writes to buffer up")
+    int getAsyncBufferSize();
+
+    @MBeanInfo("The sync strategy to use.")
+    String getSync();
+
+}

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala?rev=1497843&r1=1497842&r2=1497843&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
Fri Jun 28 16:57:29 2013
@@ -20,8 +20,8 @@ import org.fusesource.fabric.groups._
 import org.fusesource.fabric.zookeeper.internal.ZKClient
 import org.linkedin.util.clock.Timespan
 import scala.reflect.BeanProperty
-import org.apache.activemq.util.{ServiceStopper, ServiceSupport}
-import org.apache.activemq.leveldb.{LevelDBClient, RecordLog, LevelDBStore}
+import org.apache.activemq.util.{JMXSupport, ServiceStopper, ServiceSupport}
+import org.apache.activemq.leveldb.{LevelDBStoreViewMBean, LevelDBClient, RecordLog, LevelDBStore}
 import java.net.{NetworkInterface, InetAddress}
 import org.fusesource.hawtdispatch._
 import org.apache.activemq.broker.Locker
@@ -32,6 +32,10 @@ import org.apache.activemq.leveldb.util.
 import java.io.File
 import org.apache.activemq.usage.SystemUsage
 import org.apache.activemq.ActiveMQMessageAuditNoSync
+import org.fusesource.hawtdispatch
+import org.apache.activemq.broker.jmx.{BrokerMBeanSupport, AnnotatedMBean}
+import org.apache.activemq.leveldb.LevelDBStore._
+import javax.management.ObjectName
 
 object ElectingLevelDBStore extends Log {
 
@@ -141,6 +145,16 @@ class ElectingLevelDBStore extends Proxy
 
   def init() {
 
+    if(brokerService!=null){
+      try {
+        AnnotatedMBean.registerMBean(brokerService.getManagementContext, new ReplicatedLevelDBStoreView(this),
objectName)
+      } catch {
+        case e: Throwable => {
+          warn(e, "PersistenceAdapterReplication could not be registered in JMX: " + e.getMessage)
+        }
+      }
+    }
+
     // Figure out our position in the store.
     directory.mkdirs()
     val log = new RecordLog(directory, LevelDBClient.LOG_SUFFIX)
@@ -217,11 +231,21 @@ class ElectingLevelDBStore extends Proxy
     })
   }
 
+  def objectName = {
+    var objectNameStr = brokerService.getBrokerObjectName.toString;
+    objectNameStr += "," + "Service=PersistenceAdapterReplication";
+    objectNameStr += "," + "InstanceName=" + JMXSupport.encodeObjectNamePart("LevelDB[" +
directory.getAbsolutePath + "]");
+    new ObjectName(objectNameStr);
+  }
+
   protected def doStart() = {
     master_started_latch.await()
   }
 
   protected def doStop(stopper: ServiceStopper) {
+    if(brokerService!=null){
+      brokerService.getManagementContext().unregisterMBean(objectName);
+    }
     zk_client.close()
     zk_client = null
     if( master_started.get() ) {
@@ -327,3 +351,59 @@ class ElectingLevelDBStore extends Proxy
     }
   }
 }
+
+
+class ReplicatedLevelDBStoreView(val store:ElectingLevelDBStore) extends ReplicatedLevelDBStoreViewMBean
{
+  import store._
+
+  def getZkAddress = zkAddress
+  def getZkPath = zkPath
+  def getZkSessionTmeout = zkSessionTmeout
+  def getBind = bind
+  def getReplicas = replicas
+
+  def getNodeRole:String = {
+    if( slave!=null ) {
+      return "slave"
+    }
+    if( master!=null ) {
+      return "master"
+    }
+    "electing"
+  }
+
+  def getStatus:String = {
+    if( slave!=null ) {
+      return slave.status
+    }
+    if( master!=null ) {
+      return master.status
+    }
+    ""
+  }
+
+  def getPosition:java.lang.Long = {
+    if( slave!=null ) {
+      return new java.lang.Long(slave.wal_append_position)
+    }
+    if( master!=null ) {
+      return new java.lang.Long(master.wal_append_position)
+    }
+    null
+  }
+
+  def getAsyncBufferSize = asyncBufferSize
+  def getDirectory = directory.getCanonicalPath
+  def getIndexBlockRestartInterval = indexBlockRestartInterval
+  def getIndexBlockSize = indexBlockSize
+  def getIndexCacheSize = indexCacheSize
+  def getIndexCompression = indexCompression
+  def getIndexFactory = indexFactory
+  def getIndexMaxOpenFiles = indexMaxOpenFiles
+  def getIndexWriteBufferSize = indexWriteBufferSize
+  def getLogSize = logSize
+  def getParanoidChecks = paranoidChecks
+  def getSync = sync
+  def getVerifyChecksums = verifyChecksums
+
+}

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala?rev=1497843&r1=1497842&r2=1497843&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
Fri Jun 28 16:57:29 2013
@@ -80,6 +80,8 @@ class MasterLevelDBStore extends LevelDB
 
   val slaves = new ConcurrentHashMap[String,SlaveState]()
 
+  def status = slaves.values().map(_.status).mkString(", ")
+
   override def doStart = {
     unstash(directory)
     super.doStart

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala?rev=1497843&r1=1497842&r2=1497843&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
Fri Jun 28 16:57:29 2013
@@ -51,6 +51,8 @@ class SlaveLevelDBStore extends LevelDBS
   var wal_session:Session = _
   var transfer_session:Session = _
 
+  var status = "initialized"
+
   override def doStart() = {
     client.init()
     if (purgeOnStatup) {
@@ -90,14 +92,16 @@ class SlaveLevelDBStore extends LevelDBS
     transport.setDispatchQueue(queue)
     transport.connecting(new URI(connect), null)
 
-    info("Connecting to master: "+connect)
+    status = "Connecting to master: "+connect
+    info(status)
     wal_session = new Session(transport, (session)=>{
       // lets stash away our current state so that we can unstash it
       // in case we don't get caught up..  If the master dies,
       // the stashed data might be the best option to become the master.
       stash(directory)
       delete_store(directory)
-      debug("Connected to master.  Syncing")
+      status = "Connected to master.  Syncing"
+      debug(status)
       session.request_then(SYNC_ACTION, null) { body =>
         val response = JsonCodec.decode(body, classOf[SyncResponse])
         transfer_missing(response)
@@ -266,9 +270,22 @@ class SlaveLevelDBStore extends LevelDBS
     transport.setDispatchQueue(queue)
     transport.connecting(new URI(connect), null)
 
-    info("Connecting catchup session...")
+    status = "Connecting catchup session."
+    info(status)
     transfer_session = new Session(transport, (session)=> {
-      info("Catchup session connected...")
+
+      var total_files = 0
+      var total_size = 0L
+      var downloaded_size = 0L
+      var downloaded_files = 0
+
+      def update_download_status = {
+        status = "Slave catching up. Downloaded %.2f/%.2f kb and %d/%d files".format(downloaded_size/1024f,
total_size/1024f, downloaded_files, total_files)
+        info(status)
+      }
+
+      status = "Catchup session connected..."
+      info(status)
 
       // Transfer the log files..
       var append_offset = 0L
@@ -322,11 +339,15 @@ class SlaveLevelDBStore extends LevelDBS
           transfer.offset = 0
           transfer.length = x.length
           debug("Slave requested: "+transfer.file)
+          total_size += x.length
+          total_files += 1
           session.request_then(GET_ACTION, transfer) { body =>
             val buffer = map(target_file, 0, x.length, false)
             session.codec.readData(buffer, ^{
               unmap(buffer)
-              info("Slave downloaded: "+transfer.file+" ("+x.length+" bytes)")
+              downloaded_size += x.length
+              downloaded_files += 1
+              update_download_status
             })
           }
         }
@@ -342,18 +363,23 @@ class SlaveLevelDBStore extends LevelDBS
         transfer.offset = 0
         transfer.length = x.length
         info("Slave requested: "+transfer.file)
+        total_size += x.length
+        total_files += 1
         session.request_then(GET_ACTION, transfer) { body =>
           val buffer = map(dirty_index / x.file, 0, x.length, false)
           session.codec.readData(buffer, ^{
             unmap(buffer)
-            info("Slave downloaded: "+transfer.file+" ("+x.length+" bytes)")
+            downloaded_size += x.length
+            downloaded_files += 1
+            update_download_status
           })
         }
       }
 
       session.request_then(DISCONNECT_ACTION, null) { body =>
         // Ok we are now caught up.
-        info("Slave has now caught up")
+        status = "Synchronize"
+        info(status)
         stash_clear(directory) // we don't need the stash anymore.
         transport.stop(NOOP)
         transfer_session = null



Mime
View raw message