activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1233180 - in /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb: LevelDBClient.scala RecordLog.scala
Date Thu, 19 Jan 2012 05:15:52 GMT
Author: chirino
Date: Thu Jan 19 05:15:52 2012
New Revision: 1233180

URL: http://svn.apache.org/viewvc?rev=1233180&view=rev
Log:
Some follow on work for APLO-128, improve the syncronization used by the log impl and use
better defaults

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1233180&r1=1233179&r2=1233180&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Thu Jan 19 05:15:52 2012
@@ -40,6 +40,7 @@ import org.apache.activemq.apollo.dto.Js
 import java.util.Map
 import org.iq80.leveldb._
 import org.apache.activemq.apollo.broker.store.leveldb.HelperTrait._
+import org.apache.activemq.apollo.broker.store.leveldb.RecordLog.LogInfo
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -215,7 +216,14 @@ class LevelDBClient(store: LevelDBStore)
 
   def log_size = {
     import OptionSupport._
-    Option(config.log_size).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(1024 * 1024
* 100)
+    Option(config.log_size).map(MemoryPropertyEditor.parse(_)).map{size=>
+      if(size > Int.MaxValue) {
+        warn("leveldb log_size was configured to be '"+config.log_size+"' but the maximum
supported log size is 2G")
+        Int.MaxValue
+      } else {
+        size.toInt
+      }
+    }.getOrElse(1024 * 1024 * 100)
   }
 
   def start() = {
@@ -243,7 +251,7 @@ class LevelDBClient(store: LevelDBStore)
 
     config.index_max_open_files.foreach( index_options.maxOpenFiles(_) )
     config.index_block_restart_interval.foreach( index_options.blockRestartInterval(_) )
-    index_options.paranoidChecks(config.paranoid_checks.getOrElse(true))
+    index_options.paranoidChecks(config.paranoid_checks.getOrElse(false))
     Option(config.index_write_buffer_size).map(MemoryPropertyEditor.parse(_).toInt).foreach(
index_options.writeBufferSize(_) )
     Option(config.index_block_size).map(MemoryPropertyEditor.parse(_).toInt).foreach( index_options.blockSize(_)
)
     Option(config.index_compression).foreach(x => index_options.compressionType( x match
{
@@ -254,7 +262,7 @@ class LevelDBClient(store: LevelDBStore)
 
     index_options.cacheSize(Option(config.index_cache_size).map(MemoryPropertyEditor.parse(_).toLong).getOrElse(1024*1024*256L))
     index_options.logger(new Logger() {
-      def log(msg: String) = debug(store.store_kind+": "+msg)
+      def log(msg: String) = debug(store.store_kind+": "+msg.stripSuffix("\n"))
     })
 
     log = create_log
@@ -513,7 +521,7 @@ class LevelDBClient(store: LevelDBStore)
       } catch {
         case e:Throwable =>
           if( error==null ) {
-            warn(e, "DB operation failed. (entering recovery mode)")
+            warn(e, "DB operation failed. (entering recovery mode): "+e)
           }
           error = e
       }
@@ -880,7 +888,7 @@ class LevelDBClient(store: LevelDBStore)
     }
   }
   
-  case class UsageCounter(info:RecordLog#LogInfo) {
+  case class UsageCounter(info:LogInfo) {
     var count = 0L
     var size = 0L
     var first_reference_queue:QueueRecord = _

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1233180&r1=1233179&r2=1233180&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Thu Jan 19 05:15:52 2012
@@ -22,12 +22,12 @@ import java.{util=>ju}
 import java.util.zip.CRC32
 import java.util.Map.Entry
 import collection.immutable.TreeMap
-import org.fusesource.hawtdispatch.BaseRetained
 import java.util.concurrent.atomic.AtomicLong
 import java.io._
 import org.apache.activemq.apollo.util.FileSupport._
 import org.apache.activemq.apollo.util.{Log, LRUCache}
 import org.fusesource.hawtbuf.{DataByteArrayInputStream, DataByteArrayOutputStream, Buffer}
+import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
 
 object RecordLog extends Log {
 
@@ -42,7 +42,13 @@ object RecordLog extends Log {
 
   val LOG_HEADER_PREFIX = '*'.toByte
   val LOG_HEADER_SIZE = 10
-  val BUFFER_SIZE = 1024
+
+  val BUFFER_SIZE = 1024*512
+  val BYPASS_BUFFER_SIZE = 1024*16
+
+  case class LogInfo(file:File, position:Long, length:Long) {
+    def limit = position+length
+  }
 }
 
 case class RecordLog(directory: File, logSuffix:String) {
@@ -55,9 +61,6 @@ case class RecordLog(directory: File, lo
   var paranoidChecks = false
   var sync = false
 
-  case class LogInfo(file:File, position:Long, length:AtomicLong) {
-    def limit = position+length.get
-  }
 
   var log_infos = TreeMap[Long, LogInfo]()
   object log_mutex
@@ -65,7 +68,7 @@ case class RecordLog(directory: File, lo
   def delete(id:Long) = {
     log_mutex.synchronized {
       // We can't delete the current appender.
-      if( current_appender.start != id ) {
+      if( current_appender.position != id ) {
         log_infos.get(id).foreach { info =>
           onDelete(info.file)
           log_infos = log_infos.filterNot(_._1 == id)
@@ -84,7 +87,20 @@ case class RecordLog(directory: File, lo
     (checksum.getValue & 0xFFFFFFFF).toInt
   }
 
-  class LogAppender(file:File, start:Long) extends LogReader(file, start) {
+  var write_thread:Thread = _
+  def is_write_thread_executing = if(write_thread==null) {
+    write_thread = Thread.currentThread()
+    true
+  } else {
+    write_thread eq Thread.currentThread() 
+  }
+
+  def assert_on_write_thread = if ( !is_write_thread_executing) {
+    val current: Thread = Thread.currentThread()
+    throw new Exception("current: "+current.getName+", expected: "+write_thread.getName)
+  }
+  
+  class LogAppender(file:File, position:Long) extends LogReader(file, position) {
 
     override def open = new RandomAccessFile(file, "rw")
 
@@ -93,9 +109,13 @@ case class RecordLog(directory: File, lo
       super.dispose()
     }
 
-    val length = new AtomicLong(0)
-    
-    def limit = start+length.get()
+    var append_offset = 0L
+    val flushed_offset = new AtomicLong(0)
+
+    def append_position = {
+      assert_on_write_thread
+      position+append_offset
+    }
 
     // set the file size ahead of time so that we don't have to sync the file
     // meta-data on every log sync.
@@ -104,101 +124,131 @@ case class RecordLog(directory: File, lo
     channel.force(true)
     channel.position(0)
 
-    val os = new DataByteArrayOutputStream((BUFFER_SIZE)+LOG_HEADER_PREFIX)
+    val write_buffer = new DataByteArrayOutputStream((BUFFER_SIZE)+BUFFER_SIZE)
 
     def force = {
       // only need to update the file metadata if the file size changes..
+      assert_on_write_thread
       flush
       if(sync) {
-        channel.force(length.get() > logSize)
-      }
-    }
-
-    def flush = {
-      if( os.position() > 0 ) {
-        val buffer = os.toBuffer.toByteBuffer
-        val pos = length.get()-buffer.remaining
-        trace("wrote at "+pos+" "+os.toBuffer)
-        channel.write(buffer, pos)
-        if( buffer.hasRemaining ) {
-          throw new IOException("Short write")
-        }
-        os.reset()
+        channel.force(append_offset > logSize)
       }
     }
 
     /**
      * returns the offset position of the data record.
      */
-    def append(id:Byte, data: Buffer): Long = {
-      val rc = limit
+    def append(id:Byte, data: Buffer): Long = this.synchronized {
+      assert_on_write_thread
+      val record_position = append_position
       val data_length = data.length
       val total_length = LOG_HEADER_SIZE + data_length
       
-      if( os.position() + total_length > BUFFER_SIZE ) {
+      if( write_buffer.position() + total_length > BUFFER_SIZE ) {
         flush
       }
 
-      if( total_length > (BUFFER_SIZE<<2) ) {
+      val cs: Int = checksum(data)
+//      trace("Writing at: "+record_position+" len: "+data_length+" with checksum: "+cs)
+
+      if( total_length > BYPASS_BUFFER_SIZE ) {
 
         // Write the header and flush..
-        os.writeByte(LOG_HEADER_PREFIX)
-        os.writeByte(id)
-        os.writeInt(checksum(data))
-        os.writeInt(data_length)
+        write_buffer.writeByte(LOG_HEADER_PREFIX)
+        write_buffer.writeByte(id)
+        write_buffer.writeInt(cs)
+        write_buffer.writeInt(data_length)
 
-        length.addAndGet(LOG_HEADER_PREFIX)
+        append_offset += LOG_HEADER_SIZE
         flush
 
         // Directly write the data to the channel since it's large.
         val buffer = data.toByteBuffer
-        val pos = length.get()+LOG_HEADER_PREFIX
-        trace("wrote at "+pos+" "+data)
+        val pos = append_offset+LOG_HEADER_SIZE
+        flushed_offset.addAndGet(buffer.remaining)
         channel.write(buffer, pos)
         if( buffer.hasRemaining ) {
           throw new IOException("Short write")
         }
-        length.addAndGet(data_length)
+        append_offset += data_length
 
       } else {
-        os.writeByte(LOG_HEADER_PREFIX)
-        os.writeByte(id)
-        os.writeInt(checksum(data))
-        os.writeInt(data_length)
-        os.write(data.data, data.offset, data_length)
-        length.addAndGet(total_length)
+        write_buffer.writeByte(LOG_HEADER_PREFIX)
+        write_buffer.writeByte(id)
+        write_buffer.writeInt(cs)
+        write_buffer.writeInt(data_length)
+        write_buffer.write(data.data, data.offset, data_length)
+        append_offset += total_length
+      }
+      record_position
+    }
+
+    def flush = this.synchronized {
+      assert_on_write_thread
+      if( write_buffer.position() > 0 ) {
+        val buffer = write_buffer.toBuffer.toByteBuffer
+        val pos = append_offset-buffer.remaining
+        flushed_offset.addAndGet(buffer.remaining)
+        channel.write(buffer, pos)
+        if( buffer.hasRemaining ) {
+          throw new IOException("Short write")
+        }
+        write_buffer.reset()
+      }
+    }
+
+//    override def read(record_position: Long, length: Int) = this.synchronized {
+//      super.read(record_position, length)
+//    }
+//
+//    override def read(record_position: Long) = this.synchronized {
+//      super.read(record_position)
+//    }
+//
+//    override def check(record_position: Long) = this.synchronized {
+//      super.check(record_position)
+//    }
+
+    override def check_read_flush(end_offset:Int) = {
+      if( flushed_offset.get() < end_offset )  {
+        this.synchronized {
+          println("read flush")
+          flush
+        }
       }
-      rc
     }
 
   }
 
-  case class LogReader(file:File, start:Long) extends BaseRetained {
-    
-    val fd = open
+  case class LogReader(file:File, position:Long) extends BaseRetained {
 
     def open = new RandomAccessFile(file, "r")
-    
-    def channel = fd.getChannel
+
+    val fd = open
+    val channel = fd.getChannel
 
     override def dispose() {
       fd.close()
     }
 
-    def read(pos:Long, length:Int) = this.synchronized {
-      val offset = (pos-start).toInt
+    def check_read_flush(end_offset:Int) = {}
+    
+    def read(record_position:Long, length:Int) = {
+      val offset = (record_position-position).toInt
+      check_read_flush(offset+LOG_HEADER_SIZE+length)
+      
       if(paranoidChecks) {
+
         val record = new Buffer(LOG_HEADER_SIZE+length)
+
         if( channel.read(record.toByteBuffer, offset) != record.length ) {
-          val data2 = new Buffer(LOG_HEADER_SIZE+length)
-          channel.read(data2.toByteBuffer, offset)
-          throw new IOException("short record at position: "+pos+" in file: "+file+", offset:
"+offset)
+          throw new IOException("short record at position: "+record_position+" in file: "+file+",
offset: "+offset)
         }
 
         val is = new DataByteArrayInputStream(record)
         val prefix = is.readByte()
         if( prefix != LOG_HEADER_PREFIX ) {
-          throw new IOException("invalid record at position: "+pos+" in file: "+file+", offset:
"+offset)
+          throw new IOException("invalid record at position: "+record_position+" in file:
"+file+", offset: "+offset)
         }
 
         val id = is.readByte()
@@ -209,7 +259,7 @@ case class RecordLog(directory: File, lo
         // If your reading the whole record we can verify the data checksum
         if( expectedLength == length ) {
           if( expectedChecksum != checksum(data) ) {
-            throw new IOException("checksum does not match at position: "+pos+" in file:
"+file+", offset: "+offset)
+            throw new IOException("checksum does not match at position: "+record_position+"
in file: "+file+", offset: "+offset)
           }
         }
 
@@ -217,14 +267,14 @@ case class RecordLog(directory: File, lo
       } else {
         val data = new Buffer(length)
         if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) != data.length ) {
-          throw new IOException("short record at position: "+pos+" in file: "+file+", offset:
"+offset)
+          throw new IOException("short record at position: "+record_position+" in file: "+file+",
offset: "+offset)
         }
         data
       }
     }
 
-    def read(pos:Long) = this.synchronized {
-      val offset = (pos-start).toInt
+    def read(record_position:Long) = {
+      val offset = (record_position-position).toInt
       val header = new Buffer(LOG_HEADER_SIZE)
       channel.read(header.toByteBuffer, offset)
       val is = header.bigEndianEditor();
@@ -247,11 +297,11 @@ case class RecordLog(directory: File, lo
           throw new IOException("checksum does not match")
         }
       }
-      (id, data, pos+LOG_HEADER_SIZE+length)
+      (id, data, record_position+LOG_HEADER_SIZE+length)
     }
 
-    def check(pos:Long):Option[Long] = this.synchronized {
-      var offset = (pos-start).toInt
+    def check(record_position:Long):Option[Long] = {
+      var offset = (record_position-position).toInt
       val header = new Buffer(LOG_HEADER_SIZE)
       channel.read(header.toByteBuffer, offset)
       val is = header.bigEndianEditor();
@@ -289,12 +339,12 @@ case class RecordLog(directory: File, lo
       if( expectedChecksum !=  checksum ) {
         return None
       }
-      return Some(pos+LOG_HEADER_SIZE+length)
+      return Some(record_position+LOG_HEADER_SIZE+length)
     }
 
-    def verifyAndGetEndPosition:Long = this.synchronized {
-      var pos = start;
-      val limit = start+channel.size()
+    def verifyAndGetEndPosition:Long = {
+      var pos = position;
+      val limit = position+channel.size()
       while(pos < limit) {
         check(pos) match {
           case Some(next) => pos = next
@@ -310,16 +360,20 @@ case class RecordLog(directory: File, lo
   }
 
   def create_appender(position: Long): Any = {
-    current_appender = create_log_appender(position)
+    assert_on_write_thread
     log_mutex.synchronized {
-      log_infos += position -> new LogInfo(current_appender.file, position, current_appender.length)
+      if(current_appender!=null) {
+        log_infos += position -> new LogInfo(current_appender.file, current_appender.position,
current_appender.append_offset)
+      }
+      current_appender = create_log_appender(position)
+      log_infos += position -> new LogInfo(current_appender.file, position, 0)
     }
   }
 
   def open = {
     log_mutex.synchronized {
       log_infos = LevelDBClient.find_sequence_files(directory, logSuffix).map { case (position,file)
=>
-        position -> LogInfo(file, position, new AtomicLong(file.length()))
+        position -> LogInfo(file, position, file.length())
       }
 
       val appendPos = if( log_infos.isEmpty ) {
@@ -328,13 +382,14 @@ case class RecordLog(directory: File, lo
         val (_, file) = log_infos.last
         val r = LogReader(file.file, file.position)
         try {
-          val rc = r.verifyAndGetEndPosition
-          file.length.set(rc - file.position)
-          if( file.file.length != file.length.get() ) {
+          val actualLength = r.verifyAndGetEndPosition
+          val updated = file.copy(length = actualLength - file.position)
+          log_infos = log_infos + (updated.position->updated)
+          if( updated.file.length != file.length ) {
             // we need to truncate.
-            using(new RandomAccessFile(file.file, "rw")) ( _.setLength(file.length.get())
)
+            using(new RandomAccessFile(file.file, "rw")) ( _.setLength(updated.length))
           }
-          rc
+          actualLength
         } finally {
           r.release()
         }
@@ -350,8 +405,8 @@ case class RecordLog(directory: File, lo
     }
   }
 
-  def appender_limit = current_appender.limit
-  def appender_start = current_appender.start
+  def appender_limit = current_appender.append_position
+  def appender_start = current_appender.position
 
   def next_log(position:Long) = LevelDBClient.create_sequence_file(directory, position, logSuffix)
 
@@ -361,10 +416,11 @@ case class RecordLog(directory: File, lo
     } finally {
       current_appender.flush
       log_mutex.synchronized {
-        if ( current_appender.length.get >= logSize ) {
+        assert_on_write_thread
+        if ( current_appender.append_offset >= logSize ) {
           current_appender.release()
           on_log_rotate()
-          create_appender(current_appender.limit)
+          create_appender(current_appender.append_position)
         }
       }
     }
@@ -380,12 +436,12 @@ case class RecordLog(directory: File, lo
 
   def log_info(pos:Long) = log_mutex.synchronized(log_infos.range(0L, pos+1).lastOption.map(_._2))
 
-  private def get_reader[T](pos:Long)(func: (LogReader)=>T) = {
+  private def get_reader[T](record_position:Long)(func: (LogReader)=>T) = {
 
     val lookup = log_mutex.synchronized {
-      val info = log_info(pos)
+      val info = log_info(record_position)
       info.map { info=>
-        if(info.position == current_appender.start) {
+        if(info.position == current_appender.position) {
           current_appender.retain()
           (info, current_appender)
         } else {



Mime
View raw message