esme-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andy the destroyer <andythedestro...@gmail.com>
Subject Stats code
Date Mon, 03 Aug 2009 19:12:26 GMT
Hi all,

Attached is a patch file containing basic stats gathering code. I introduced
a dependency on a scala-stats project I forked from the twitter guys and
modified. The scala-stats dependency can be downloaded from
http://github.com/andythedestroyer/scala-stats/downloads.

I initially set up 19 counters for:

* liftSessions - ( actually grabbed from SessionWatcher )
* usersLoggedIn ( incremented and decremented through User.logUserIn and
User logUserOut)
* userCount ( User actors started since start/reset )
* groupCount

* messagesCreated
* userMessagesCreated
* interpreterMessagesCreated
* schedulerMessagesCreated

* messagesPulled ( messages pulled from MessagePullActor )

* messagesDelivered
* messagesDeliveredTrackReason
* messagesDeliveredDirectReason
* messagesDeliveredConversationReason
* messagesDeliveredResendReason
* messagesDeliveredRegularReason
* messagesDeliveredInterpreterReason

* messagesMailed
* messagesSentViaHttp
* messagesFiltered

and 3 gauges for:
( Gauges are partial functions that run to grab a point in time statistic
when the stat is requested )
* users ( from Distributor )
* groups ""
* listeners ""

The stats MBean is dynamic and the stats are created when they are first
incremented, which means if you connect your jconsole at the start of the
server you may not see all the stats. You will need to reconnect later or
invoke the DynamicMBean.getBeanInfo method which jconsole only does once
when it connects.

There are two JMX operations: reset and getGatheringTime. Reset will reset
all statistics to zero and it will actually remove the gauges, which
shouldn't happen. It definitly needs some more work but I wanted to get some
opinions.

Here are a few items that can be added or improved.
* Timers
Timers are functions that wrap other functions, time how long it takes for
the wrapped function to execute and gathers max / min / avg.
Maybe wrap the message parser?.. Could also add other metrics like std
deviation and/or rate of increase/descrease in duration over time periods.

* Intigrate lift statistics
Lower level stats like http request counters and timers. Http response code
counters. DB connection pool counters and maybe a small array of the longest
running db queries.

* Add initializer stack for scala-stats
Add a mutable list of functions that will be called when the Stats object is
started or reset. This could initialize statistics from a db or elsewhere.

Let me know if this is the direction you guys are thinking of or if I am way
off mark.
Thanks
Andy

***************************

Index: src/main/scala/org/apache/esme/model/User.scala
===================================================================
--- src/main/scala/org/apache/esme/model/User.scala    (revision 800477)
+++ src/main/scala/org/apache/esme/model/User.scala    (working copy)
@@ -45,6 +45,8 @@
 import java.net.URL
 import java.util.logging._

+import com.twitter.service.{ Stats => ESMEStatistics }
+
 object User extends User with KeyedMetaMapper[Long, User] {
   override def afterSave = profileChanged _ :: notifyActors _ ::
super.afterSave

@@ -382,6 +384,9 @@
     curUser.remove()
     curUserId(Full(who.id.toString))
     onLogIn.foreach(_(who))
+
+    // This may not be accurate because of users timing out instead of
logging out
+    ESMEStatistics incr "usersLoggedIn"
   }

   def logoutCurrentUser = logUserOut()
@@ -391,6 +396,7 @@
     curUserId.remove()
     curUser.remove()
     S.request.foreach(_.request.getSession.invalidate)
+    ESMEStatistics getCounter "usersLoggedIn" decr
   }

   private object curUserId extends SessionVar[Box[String]](Empty)
Index: src/main/scala/org/apache/esme/lib/AccessPoolMgr.scala
===================================================================
--- src/main/scala/org/apache/esme/lib/AccessPoolMgr.scala    (revision
800477)
+++ src/main/scala/org/apache/esme/lib/AccessPoolMgr.scala    (working copy)
@@ -85,7 +85,7 @@
     }

     bind("add", in,
-         "poolName" -> text("", addNewPool, "id" -> theInput)
+         "poolName" -> text("", addNewPool _, "id" -> theInput)
     )

   }
@@ -138,7 +138,7 @@

poolId.set(p.toLong);
                                                        redisplayPool()},
                                                  "id" -> editPoolName),
-         "username" -> text(username, username = _, "id" -> editUsername),
+         "username" -> text(username, username = _:String, "id" ->
editUsername),
          "permission" -> select(permissions, Empty, addPoolUser, "id" ->
editPermission)
     )

Index: src/main/scala/org/apache/esme/lib/ActionMgr.scala
===================================================================
--- src/main/scala/org/apache/esme/lib/ActionMgr.scala    (revision 800477)
+++ src/main/scala/org/apache/esme/lib/ActionMgr.scala    (working copy)
@@ -123,7 +123,7 @@


     bind("main", in,
-         "name" -> text(name, name = _, "id" -> mainName),
+         "name" -> text(name, name = _:String, "id" -> mainName),
          "test" -> textarea(test, test = _, "id" -> mainTest),
          "action" -> textarea("", doSave, "id" -> mainAction)
     )
Index: src/main/scala/org/apache/esme/lib/AuthMgr.scala
===================================================================
--- src/main/scala/org/apache/esme/lib/AuthMgr.scala    (revision 800477)
+++ src/main/scala/org/apache/esme/lib/AuthMgr.scala    (working copy)
@@ -100,7 +100,7 @@
     }

     bind("main", in,
-         "token" -> text("", addAuthToken, "id" -> theInput)
+         "token" -> text("", addAuthToken _, "id" -> theInput)
     )

   }
Index: src/main/scala/org/apache/esme/lib/TrackMgr.scala
===================================================================
--- src/main/scala/org/apache/esme/lib/TrackMgr.scala    (revision 800477)
+++ src/main/scala/org/apache/esme/lib/TrackMgr.scala    (working copy)
@@ -101,7 +101,7 @@
     }

     bind("main", in,
-         "track" -> text("", addTrack, "id" -> theInput)
+         "track" -> text("", addTrack _, "id" -> theInput)
     )

   }
Index: src/main/scala/org/apache/esme/actor/GroupActor.scala
===================================================================
--- src/main/scala/org/apache/esme/actor/GroupActor.scala    (revision
800477)
+++ src/main/scala/org/apache/esme/actor/GroupActor.scala    (working copy)
@@ -30,6 +30,7 @@

 import org.apache.esme._
 import model._
+import com.twitter.service.{ Stats => ESMEStatistics }

 object GroupActor {
   case class StartMeUp(group: Long)
@@ -42,6 +43,7 @@
     react {
       case StartMeUp =>
         link(ActorWatcher)
+        ESMEStatistics incr "groupCount"
     }
   }
 }
Index: src/main/scala/org/apache/esme/actor/ScalaInterpreter.scala
===================================================================
--- src/main/scala/org/apache/esme/actor/ScalaInterpreter.scala    (revision
800477)
+++ src/main/scala/org/apache/esme/actor/ScalaInterpreter.scala    (working
copy)
@@ -31,6 +31,8 @@
 import org.apache.esme.model._
 import net.liftweb.util.{Empty,Props}

+import com.twitter.service.{ Stats => ESMEStatistics }
+
 object ScalaInterpreter extends Actor{

   val settings = new Settings()
@@ -57,6 +59,9 @@
                            Distributor !
Distributor.AddMessageToMailbox(from, msg, InterpreterReason(from))
                          }
                        }
+
+        ESMEStatistics incr "interpreterMessagesCreated"
+        ESMEStatistics incr "messagesCreated"
       }

     }
Index: src/main/scala/org/apache/esme/actor/SchedulerActor.scala
===================================================================
--- src/main/scala/org/apache/esme/actor/SchedulerActor.scala    (revision
800477)
+++ src/main/scala/org/apache/esme/actor/SchedulerActor.scala    (working
copy)
@@ -28,6 +28,7 @@
 import org.apache.esme.model._
 import net.liftweb.http.ActorWatcher
 import net.liftweb.util.{Full,Empty,TimeHelpers}
+import com.twitter.service.{ Stats => ESMEStatistics }

 class SchedulerActor(val messageProcessor: Actor, val user: Long, val
everySeconds: Int, val reason: MailboxReason) extends Actor {

@@ -64,6 +65,8 @@
                        messageProcessor !
Distributor.AddMessageToMailbox(user, msg, reason)
                      // }
                    }
+     ESMEStatistics incr "schedulerMessagesCreated"
+     ESMEStatistics incr "messagesCreated"
   }

 }
Index: src/main/scala/org/apache/esme/actor/UserActor.scala
===================================================================
--- src/main/scala/org/apache/esme/actor/UserActor.scala    (revision
800477)
+++ src/main/scala/org/apache/esme/actor/UserActor.scala    (working copy)
@@ -36,6 +36,7 @@
 import java.util.logging._
 import java.util.{TimeZone, Calendar}
 import scala.xml.{Elem}
+import com.twitter.service.{ Stats => ESMEStatistics }

 object UserActor {
   private[actor] case class StartMeUp(user: Long)
@@ -106,6 +107,8 @@
         this ! UpdateTracking(Distributor.TrackTrackingType)
         this ! UpdateTracking(Distributor.PerformTrackingType)

+        ESMEStatistics incr "userCount"
+
       case RunFunc(f) =>
         f()

@@ -135,7 +138,9 @@
                  else null

           msg.saveMe
-
+          ESMEStatistics incr "userMessagesCreated"
+          ESMEStatistics incr "messagesCreated"
+
           Distributor ! Distributor.AddMessageToMailbox(userId, msg,
NoReason)

           for (id <- followers)
@@ -222,13 +227,20 @@
         val mb = Mailbox.create.user(userId).message(msg)
         reason match {
           case TrackReason(trackId) => mb.viaTrack(trackId)
+               ESMEStatistics incr "messagesDeliveredTrackReason"
           case DirectReason(fromId) => mb.directlyFrom(fromId)
+               ESMEStatistics incr "messagesDeliveredDirectReason"
           case ConversationReason(convId) => mb.conversation(convId)
+               ESMEStatistics incr "messagesDeliveredConversationReason"
           case ResendReason(resender) => mb.resentBy(resender)
+               ESMEStatistics incr "messagesDeliveredResendReason"
+          case RegularReason(id) => ESMEStatistics incr
"messagesDeliveredRegularReason"
+          case InterpreterReason(id) => ESMEStatistics incr
"messagesDeliveredInterpreterReason"
           case _ =>
         }
         mb.saveMe
-
+        ESMEStatistics incr "messagesDelivered"
+
         _mailbox = ((msg.id.is, reason) ::
_mailbox.toList).take(500).toArray

         listeners.foreach(_ ! MessageReceived(msg, reason))
@@ -238,12 +250,14 @@

           td.whatToDo match {
             case m @ MailTo(_, _) =>
-              User.find(userId).foreach( u =>
-                HttpSender ! HttpSender.SendAMessage(m, msg, u, reason,
td.uniqueId))
+              User.find(userId).foreach( u => {
+                HttpSender ! HttpSender.SendAMessage(m, msg, u, reason,
td.uniqueId)
+                ESMEStatistics incr "messagesMailed"})

             case h @ HttpTo(_, _, _, _, _) =>
-              User.find(userId).foreach( u =>
-                HttpSender ! HttpSender.SendAMessage(h, msg, u, reason,
td.uniqueId))
+              User.find(userId).foreach( u => {
+                HttpSender ! HttpSender.SendAMessage(h, msg, u, reason,
td.uniqueId)
+                ESMEStatistics incr "messagesSentViaHTTP"})

             case PerformResend =>
               if (! msg.saved_?) msg.save
@@ -256,7 +270,7 @@
             case ScalaInterpret => if (msg.source.is != "scala")
               ScalaInterpreter ! ScalaInterpreter.ScalaExcerpt(userId,
msg.id.is, msg.pool.is, msg.getText)

-            case PerformFilter => // IGNORE
+            case PerformFilter => ESMEStatistics incr "messagesFiltered" //
IGNORE
           }
         }
       }
Index: src/main/scala/org/apache/esme/actor/Distributor.scala
===================================================================
--- src/main/scala/org/apache/esme/actor/Distributor.scala    (revision
800477)
+++ src/main/scala/org/apache/esme/actor/Distributor.scala    (working copy)
@@ -159,4 +159,8 @@
         ret
     }
   }
+
+  def getUsersCount  = users.size
+  def getGroupsCount = groups.size
+  def getListenersCount = listeners.size
 }
Index: src/main/scala/org/apache/esme/actor/MessagePullActor.scala
===================================================================
--- src/main/scala/org/apache/esme/actor/MessagePullActor.scala    (revision
800477)
+++ src/main/scala/org/apache/esme/actor/MessagePullActor.scala    (working
copy)
@@ -25,6 +25,7 @@
 import scala.actors.Actor._
 import net.liftweb.http.ActorWatcher
 import org.apache.esme.actor.Distributor.{UserCreatedMessage=>Msg}
+import com.twitter.service.{ Stats => ESMEStatistics }

 class MessagePullActor(val messageProcessor: Actor, private var
lastMessage: Option[Msg], val messageSource: UniqueMessageSource) extends
Actor {

@@ -45,6 +46,7 @@
           for (message <- lastMessages) {
             messageProcessor ! message
             lastMessage = Some(message)
+            ESMEStatistics incr "messagesPulled"
           }
         }
         case FetchMessages => actor {
Index: src/main/scala/bootstrap/liftweb/Boot.scala
===================================================================
--- src/main/scala/bootstrap/liftweb/Boot.scala    (revision 800477)
+++ src/main/scala/bootstrap/liftweb/Boot.scala    (working copy)
@@ -43,7 +43,7 @@
 import org.compass.core.config.CompassConfiguration
 import scala.actors.Actor
 import Actor._
-
+import com.twitter.service.{StatsMBean, Stats => ESMEStatistics }
 /**
  * A class that's instantiated early and run.  It allows the application
  * to modify lift's environment
@@ -140,11 +140,18 @@

     LiftRules.early.append(makeUtf8)

+    // register stats gathering object with platform mbean server
+    StatsMBean("org.apache.esme.stats")
+
     Distributor.touch
     SchedulerActor.touch
     MessagePullActor.touch
     ScalaInterpreter.touch

+    ESMEStatistics.makeGauge("users") { Distributor.getUsersCount  }
+    ESMEStatistics.makeGauge("groups") { Distributor.getGroupsCount }
+    ESMEStatistics.makeGauge("listeners") { Distributor.getListenersCount }
+
     Action.findAll(By(Action.disabled, false), By(Action.removed,
false)).foreach {
       _.startActors
     }
@@ -265,6 +272,7 @@
     loop {
       react {
         case SessionWatcherInfo(sessions) =>
+          ESMEStatistics.getCounter("liftSessions").set(sessions.size)
           if ((millis - tenMinutes) > lastTime) {
             lastTime = millis
             val rt = Runtime.getRuntime
Index: pom.xml
===================================================================
--- pom.xml    (revision 800477)
+++ pom.xml    (working copy)
@@ -137,6 +137,13 @@
       <scope>provided</scope>
     </dependency>

+    <!-- for stats gathering and stats MBean code can be found at
+    git://github.com/andythedestroyer/scala-stats.git -->
+    <dependency>
+        <groupId>com.twitter.service</groupId>
+        <artifactId>stats</artifactId>
+        <version>1.0</version>
+    </dependency>

     <dependency>
       <groupId>junit</groupId>

Mime
  • Unnamed multipart/mixed (inline, None, 0 bytes)
View raw message