activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1233145 - in /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb: dto/LevelDBStoreDTO.java leveldb/LevelDBClient.scala leveldb/RecordLog.scala
Date Thu, 19 Jan 2012 00:59:11 GMT
Author: chirino
Date: Thu Jan 19 00:59:10 2012
New Revision: 1233145

URL: http://svn.apache.org/viewvc?rev=1233145&view=rev
Log:
Fixes APLO-128 : Simplify the leveldb-store's log record format so that every record has a
checksum which can be used to quickly validate all read data in case your paranoid about data
corruption in your FS.

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java?rev=1233145&r1=1233144&r2=1233145&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
Thu Jan 19 00:59:10 2012
@@ -49,9 +49,6 @@ public class LevelDBStoreDTO extends Sto
     @XmlAttribute(name="log_size")
     public String log_size;
 
-    @XmlAttribute(name="log__write_buffer_size")
-    public String log_write_buffer_size;
-
     @XmlAttribute(name="index_max_open_files")
     public Integer index_max_open_files;
 
@@ -96,8 +93,6 @@ public class LevelDBStoreDTO extends Sto
             return false;
         if (index_factory != null ? !index_factory.equals(that.index_factory) : that.index_factory
!= null) return false;
         if (log_size != null ? !log_size.equals(that.log_size) : that.log_size != null) return
false;
-        if (log_write_buffer_size != null ? !log_write_buffer_size.equals(that.log_write_buffer_size)
: that.log_write_buffer_size != null)
-            return false;
         if (paranoid_checks != null ? !paranoid_checks.equals(that.paranoid_checks) : that.paranoid_checks
!= null)
             return false;
         if (read_threads != null ? !read_threads.equals(that.read_threads) : that.read_threads
!= null) return false;
@@ -118,7 +113,6 @@ public class LevelDBStoreDTO extends Sto
         result = 31 * result + (paranoid_checks != null ? paranoid_checks.hashCode() : 0);
         result = 31 * result + (verify_checksums != null ? verify_checksums.hashCode() :
0);
         result = 31 * result + (log_size != null ? log_size.hashCode() : 0);
-        result = 31 * result + (log_write_buffer_size != null ? log_write_buffer_size.hashCode()
: 0);
         result = 31 * result + (index_max_open_files != null ? index_max_open_files.hashCode()
: 0);
         result = 31 * result + (index_block_restart_interval != null ? index_block_restart_interval.hashCode()
: 0);
         result = 31 * result + (index_write_buffer_size != null ? index_write_buffer_size.hashCode()
: 0);

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1233145&r1=1233144&r2=1233145&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Thu Jan 19 00:59:10 2012
@@ -172,6 +172,9 @@ class LevelDBClient(store: LevelDBStore)
   import FileSupport._
 
   def dispatchQueue = store.dispatch_queue
+  
+  implicit def toByteArray(buf:Buffer):Array[Byte] = buf.toByteArray
+  implicit def toBuffer(buf:Array[Byte]):Buffer = new Buffer(buf)
 
   /////////////////////////////////////////////////////////////////////
   //
@@ -240,7 +243,7 @@ class LevelDBClient(store: LevelDBStore)
 
     config.index_max_open_files.foreach( index_options.maxOpenFiles(_) )
     config.index_block_restart_interval.foreach( index_options.blockRestartInterval(_) )
-    config.paranoid_checks.foreach( index_options.paranoidChecks(_) )
+    index_options.paranoidChecks(config.paranoid_checks.getOrElse(true))
     Option(config.index_write_buffer_size).map(MemoryPropertyEditor.parse(_).toInt).foreach(
index_options.writeBufferSize(_) )
     Option(config.index_block_size).map(MemoryPropertyEditor.parse(_).toInt).foreach( index_options.blockSize(_)
)
     Option(config.index_compression).foreach(x => index_options.compressionType( x match
{
@@ -255,8 +258,9 @@ class LevelDBClient(store: LevelDBStore)
     })
 
     log = create_log
-    log.write_buffer_size = Option(config.log_write_buffer_size).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(1024*1024*4)
-    log.log_size = log_size
+    log.sync = sync
+    log.logSize = log_size
+    log.paranoidChecks = index_options.paranoidChecks()
     log.on_log_rotate = ()=> {
       // lets queue a request to checkpoint when
       // the logs rotate.. queue it on the GC thread since GC's lock
@@ -660,7 +664,7 @@ class LevelDBClient(store: LevelDBStore)
         }
         if( sync_needed && sync ) {
           appender.flush
-          appender.sync
+          appender.force
         }
       }
     }
@@ -711,6 +715,7 @@ class LevelDBClient(store: LevelDBStore)
               log.read(pos, len).map { data =>
                 val rc:MessageRecord = data
                 rc.locator = new AtomicReference[Array[Byte]](locator_data)
+                assert( rc.protocol!=null )
                 rc
               }
             }
@@ -741,6 +746,7 @@ class LevelDBClient(store: LevelDBStore)
               log.read(pos, len).map { data =>
                 val rc:MessageRecord = data
                 rc.locator = new AtomicReference[Array[Byte]](locator_data)
+                assert( rc.protocol!=null )
                 rc
               }
             }

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1233145&r1=1233144&r2=1233145&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Thu Jan 19 00:59:10 2012
@@ -19,53 +19,41 @@ package org.apache.activemq.apollo.broke
 import java.{lang=>jl}
 import java.{util=>ju}
 
-import org.apache.activemq.apollo.util._
-import org.fusesource.hawtbuf.DataByteArrayOutputStream
-import java.io._
 import java.util.zip.CRC32
 import java.util.Map.Entry
-import java.util.Arrays
-import collection.mutable.{HashMap, HashSet}
 import collection.immutable.TreeMap
+import org.fusesource.hawtdispatch.BaseRetained
 import java.util.concurrent.atomic.AtomicLong
-import java.nio.ByteBuffer
+import java.io._
+import org.apache.activemq.apollo.util.FileSupport._
+import org.apache.activemq.apollo.util.{Log, LRUCache}
+import org.fusesource.hawtbuf.{DataByteArrayInputStream, DataByteArrayOutputStream, Buffer}
 
-object RecordLog {
+object RecordLog extends Log {
 
   // The log files contain a sequence of variable length log records:
-  // record :=
-  //   '*L'     : int8*2     // 2 byte constant
+  // record := header + data
+  //
+  // header :=
+  //   '*'      : int8       // Start of Record Magic
+  //   kind     : int8       // Help identify content type of the data.
   //   checksum : uint32     // crc32c of the data[]
   //   length   : uint32     // the length the the data
-  //   data     : int8*length
-  //
-  // The log records are used to aggregate multiple data records
-  // as a single write to the file system.
-
-  //
-  // The data is composed of multiple records too:
-  // data :=
-  //   kind     : int8
-  //   length   : varInt
-  //   body     : int8*length
-  //
-  // The kind field is an aid to the app layer.  It cannot be set to
-  // '*'.
-
-  val LOG_HEADER_PREFIX = Array('*', 'L').map(_.toByte)
-  val LOG_HEADER_SIZE = 10 // BATCH_HEADER_PREFIX (2) + checksum (4) + length (4)
 
+  val LOG_HEADER_PREFIX = '*'.toByte
+  val LOG_HEADER_SIZE = 10
+  val BUFFER_SIZE = 1024
 }
 
-case class RecordLog(directory: File, log_suffix:String) {
-  import FileSupport._
+case class RecordLog(directory: File, logSuffix:String) {
   import RecordLog._
 
   directory.mkdirs()
 
-  var write_buffer_size = 1024 * 1024 * 4
-  var log_size = 1024 * 1024 * 100
-  private var current_appender:LogAppender = _
+  var logSize = 1024 * 1024 * 100
+  var current_appender:LogAppender = _
+  var paranoidChecks = false
+  var sync = false
 
   case class LogInfo(file:File, position:Long, length:AtomicLong) {
     def limit = position+length.get
@@ -79,165 +67,241 @@ case class RecordLog(directory: File, lo
       // We can't delete the current appender.
       if( current_appender.start != id ) {
         log_infos.get(id).foreach { info =>
-          on_delete(info.file)
+          onDelete(info.file)
           log_infos = log_infos.filterNot(_._1 == id)
         }
       }
     }
   }
 
-  protected def on_delete(file:File) = {
+  protected def onDelete(file:File) = {
     file.delete()
   }
 
-  class LogAppender(val file:File, val start:Long) {
+  def checksum(data: Buffer): Int = {
+    val checksum = new CRC32
+    checksum.update(data.data, data.offset, data.length)
+    (checksum.getValue & 0xFFFFFFFF).toInt
+  }
 
-    val fos = new FileOutputStream(file)
-    def channel = fos.getChannel
-    def os:OutputStream = fos
+  class LogAppender(file:File, start:Long) extends LogReader(file, start) {
 
-    val outbound = new DataByteArrayOutputStream()
+    override def open = new RandomAccessFile(file, "rw")
+
+    override def dispose() = {
+      force
+      super.dispose()
+    }
 
-    var batch_length = 0
     val length = new AtomicLong(0)
-    var limit = start
+    
+    def limit = start+length.get()
 
     // set the file size ahead of time so that we don't have to sync the file
     // meta-data on every log sync.
-    channel.position(log_size)
-    channel.write(ByteBuffer.wrap(Array(0.toByte)))
+    channel.position(logSize-1)
+    channel.write(new Buffer(1).toByteBuffer)
     channel.force(true)
     channel.position(0)
 
-    def sync = {
+    val os = new DataByteArrayOutputStream((BUFFER_SIZE)+LOG_HEADER_PREFIX)
+
+    def force = {
       // only need to update the file metadata if the file size changes..
-      channel.force(length.get() > log_size)
+      flush
+      if(sync) {
+        channel.force(length.get() > logSize)
+      }
     }
 
-    def flush {
-      if( batch_length!= 0 ) {
-
-        // Update the buffer with the log header info now that we
-        // can calc the length and checksum info
-        val buffer = outbound.toBuffer
-
-        assert(buffer.length()==LOG_HEADER_SIZE+batch_length)
-
-        outbound.reset()
-        outbound.write(LOG_HEADER_PREFIX)
-
-        val checksum = new CRC32
-        checksum.update(buffer.data, buffer.offset + LOG_HEADER_SIZE, buffer.length - LOG_HEADER_SIZE)
-        var actual_checksum = (checksum.getValue & 0xFFFFFFFF).toInt
-
-        outbound.writeInt( actual_checksum )
-        outbound.writeInt(batch_length)
-
-        // Actually write the record to the file..
-        buffer.writeTo(os);
-
-        length.addAndGet( buffer.length() )
-
-        batch_length = 0
-        outbound.reset()
+    def flush = {
+      if( os.position() > 0 ) {
+        val buffer = os.toBuffer.toByteBuffer
+        val pos = length.get()-buffer.remaining
+        trace("wrote at "+pos+" "+os.toBuffer)
+        channel.write(buffer, pos)
+        if( buffer.hasRemaining ) {
+          throw new IOException("Short write")
+        }
+        os.reset()
       }
     }
 
     /**
      * returns the offset position of the data record.
      */
-    def append(id:Byte, data: Array[Byte]): Long = {
-      assert(id != LOG_HEADER_PREFIX(0))
-      if( batch_length!=0 && (batch_length + data.length > write_buffer_size)
) {
+    def append(id:Byte, data: Buffer): Long = {
+      val rc = limit
+      val data_length = data.length
+      val total_length = LOG_HEADER_SIZE + data_length
+      
+      if( os.position() + total_length > BUFFER_SIZE ) {
         flush
       }
-      if( batch_length==0 ) {
-        // first data pos record is offset by the log header.
-        outbound.skip(LOG_HEADER_SIZE);
-        limit += LOG_HEADER_SIZE
-      }
-      val rc = limit;
-
-      val start = outbound.position
-      outbound.writeByte(id);
-      outbound.writeInt(data.length)
-      outbound.write(data);
-      val count = outbound.position - start
 
-      limit += count
-      batch_length += count
-      rc
-    }
+      if( total_length > (BUFFER_SIZE<<2) ) {
 
-    def close = {
-      flush
-      channel.truncate(length.get())
-      os.close()
-    }
-  }
+        // Write the header and flush..
+        os.writeByte(LOG_HEADER_PREFIX)
+        os.writeByte(id)
+        os.writeInt(checksum(data))
+        os.writeInt(data_length)
 
-  case class LogReader(file:File, start:Long) {
+        length.addAndGet(LOG_HEADER_PREFIX)
+        flush
 
-    val is = new RandomAccessFile(file, "r")
+        // Directly write the data to the channel since it's large.
+        val buffer = data.toByteBuffer
+        val pos = length.get()+LOG_HEADER_PREFIX
+        trace("wrote at "+pos+" "+data)
+        channel.write(buffer, pos)
+        if( buffer.hasRemaining ) {
+          throw new IOException("Short write")
+        }
+        length.addAndGet(data_length)
 
-    def read(pos:Long) = this.synchronized {
-      is.seek(pos-start)
-      val id = is.read()
-      if( id == LOG_HEADER_PREFIX(0) ) {
-        (id, null, pos+LOG_HEADER_SIZE)
       } else {
-        val length = is.readInt()
-        val data = new Array[Byte](length)
-        is.readFully(data)
-        (id, data, start+is.getFilePointer)
+        os.writeByte(LOG_HEADER_PREFIX)
+        os.writeByte(id)
+        os.writeInt(checksum(data))
+        os.writeInt(data_length)
+        os.write(data.data, data.offset, data_length)
+        length.addAndGet(total_length)
       }
+      rc
+    }
+
+  }
+
+  case class LogReader(file:File, start:Long) extends BaseRetained {
+    
+    val fd = open
+
+    def open = new RandomAccessFile(file, "r")
+    
+    def channel = fd.getChannel
+
+    override def dispose() {
+      fd.close()
     }
 
     def read(pos:Long, length:Int) = this.synchronized {
-      is.seek((pos-start)+5)
-      val data = new Array[Byte](length)
-      is.readFully(data)
-      data
+      val offset = (pos-start).toInt
+      if(paranoidChecks) {
+        val record = new Buffer(LOG_HEADER_SIZE+length)
+        if( channel.read(record.toByteBuffer, offset) != record.length ) {
+          val data2 = new Buffer(LOG_HEADER_SIZE+length)
+          channel.read(data2.toByteBuffer, offset)
+          throw new IOException("short record at position: "+pos+" in file: "+file+", offset:
"+offset)
+        }
+
+        val is = new DataByteArrayInputStream(record)
+        val prefix = is.readByte()
+        if( prefix != LOG_HEADER_PREFIX ) {
+          throw new IOException("invalid record at position: "+pos+" in file: "+file+", offset:
"+offset)
+        }
+
+        val id = is.readByte()
+        val expectedChecksum = is.readInt()
+        val expectedLength = is.readInt()
+        val data = is.readBuffer(length)
+
+        // If your reading the whole record we can verify the data checksum
+        if( expectedLength == length ) {
+          if( expectedChecksum != checksum(data) ) {
+            throw new IOException("checksum does not match at position: "+pos+" in file:
"+file+", offset: "+offset)
+          }
+        }
+
+        data
+      } else {
+        val data = new Buffer(length)
+        if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) != data.length ) {
+          throw new IOException("short record at position: "+pos+" in file: "+file+", offset:
"+offset)
+        }
+        data
+      }
     }
 
-    def close = this.synchronized {
-      is.close()
+    def read(pos:Long) = this.synchronized {
+      val offset = (pos-start).toInt
+      val header = new Buffer(LOG_HEADER_SIZE)
+      channel.read(header.toByteBuffer, offset)
+      val is = header.bigEndianEditor();
+      val prefix = is.readByte()
+      if( prefix != LOG_HEADER_PREFIX ) {
+        // Does not look like a record.
+        throw new IOException("invalid record position")
+      }
+      val id = is.readByte()
+      val expectedChecksum = is.readInt()
+      val length = is.readInt()
+      val data = new Buffer(length)
+
+      if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) != length ) {
+        throw new IOException("short record")
+      }
+
+      if(paranoidChecks) {
+        if( expectedChecksum != checksum(data) ) {
+          throw new IOException("checksum does not match")
+        }
+      }
+      (id, data, pos+LOG_HEADER_SIZE+length)
     }
 
-    def next_position(verify_checksums:Boolean=true):Long = this.synchronized {
-      var offset = 0;
-      val prefix = new Array[Byte](LOG_HEADER_PREFIX.length)
-      var done = false
-      while(!done) {
-        try {
-          is.seek(offset)
-          is.readFully(prefix)
-          if( !Arrays.equals(prefix, LOG_HEADER_PREFIX) ) {
-            throw new IOException("Missing header prefix");
-          }
-          val expected_checksum = is.readInt();
+    def check(pos:Long):Option[Long] = this.synchronized {
+      var offset = (pos-start).toInt
+      val header = new Buffer(LOG_HEADER_SIZE)
+      channel.read(header.toByteBuffer, offset)
+      val is = header.bigEndianEditor();
+      val prefix = is.readByte()
+      if( prefix != LOG_HEADER_PREFIX ) {
+        return None // Does not look like a record.
+      }
+      val id = is.readByte()
+      val expectedChecksum = is.readInt()
+      val length = is.readInt()
+
+      val chunk = new Buffer(1024*4)
+      val chunkbb = chunk.toByteBuffer
+      offset += LOG_HEADER_SIZE
+
+      // Read the data in in chunks to avoid
+      // OOME if we are checking an invalid record
+      // with a bad record length
+      val checksumer = new CRC32
+      var remaining = length
+      while( remaining > 0 ) {
+        val chunkSize = remaining.min(1024*4);
+        chunkbb.position(0)
+        chunkbb.limit(chunkSize)
+        channel.read(chunkbb, offset)
+        if( chunkbb.hasRemaining ) {
+          return None
+        }
+        checksumer.update(chunk.data, 0, chunkSize)
+        offset += chunkSize
+        remaining -= chunkSize
+      }
 
-          val length = is.readInt();
-          if (verify_checksums) {
-            val data = new Array[Byte](length)
-            is.readFully(data)
-
-            val checksum = new CRC32
-            checksum.update(data)
-            val actual_checksum = (checksum.getValue & 0xFFFFFFFF).toInt
-
-            if( expected_checksum != actual_checksum ) {
-              throw new IOException("Data checksum missmatch");
-            }
-          }
-          offset += LOG_HEADER_SIZE + length
+      val checksum = ( checksumer.getValue & 0xFFFFFFFF).toInt
+      if( expectedChecksum !=  checksum ) {
+        return None
+      }
+      return Some(pos+LOG_HEADER_SIZE+length)
+    }
 
-        } catch {
-          case e:IOException =>
-            done = true
+    def verifyAndGetEndPosition:Long = this.synchronized {
+      var pos = start;
+      val limit = start+channel.size()
+      while(pos < limit) {
+        check(pos) match {
+          case Some(next) => pos = next
+          case None => return pos
         }
       }
-      start + offset
+      pos
     }
   }
 
@@ -254,17 +318,17 @@ case class RecordLog(directory: File, lo
 
   def open = {
     log_mutex.synchronized {
-      log_infos = LevelDBClient.find_sequence_files(directory, log_suffix).map { case (position,file)
=>
+      log_infos = LevelDBClient.find_sequence_files(directory, logSuffix).map { case (position,file)
=>
         position -> LogInfo(file, position, new AtomicLong(file.length()))
       }
 
-      val append_pos = if( log_infos.isEmpty ) {
+      val appendPos = if( log_infos.isEmpty ) {
         0L
       } else {
         val (_, file) = log_infos.last
         val r = LogReader(file.file, file.position)
         try {
-          val rc = r.next_position()
+          val rc = r.verifyAndGetEndPosition
           file.length.set(rc - file.position)
           if( file.file.length != file.length.get() ) {
             // we need to truncate.
@@ -272,23 +336,24 @@ case class RecordLog(directory: File, lo
           }
           rc
         } finally {
-          r.close
+          r.release()
         }
       }
 
-      create_appender(append_pos)
+      create_appender(appendPos)
     }
   }
+
   def close = {
     log_mutex.synchronized {
-      current_appender.close
+      current_appender.release
     }
   }
 
   def appender_limit = current_appender.limit
   def appender_start = current_appender.start
 
-  def next_log(position:Long) = LevelDBClient.create_sequence_file(directory, position, log_suffix)
+  def next_log(position:Long) = LevelDBClient.create_sequence_file(directory, position, logSuffix)
 
   def appender[T](func: (LogAppender)=>T):T= {
     try {
@@ -296,8 +361,8 @@ case class RecordLog(directory: File, lo
     } finally {
       current_appender.flush
       log_mutex.synchronized {
-        if ( current_appender.length.get >= log_size ) {
-          current_appender.close
+        if ( current_appender.length.get >= logSize ) {
+          current_appender.release()
           on_log_rotate()
           create_appender(current_appender.limit)
         }
@@ -307,50 +372,49 @@ case class RecordLog(directory: File, lo
 
   var on_log_rotate: ()=>Unit = ()=>{}
 
-  val next_reader_id = new LongCounter()
-  val reader_cache_files = new HashMap[File, HashSet[Long]];
-  val reader_cache_readers = new LRUCache[Long, LogReader](100) {
-    protected override def onCacheEviction(entry: Entry[Long, LogReader]) = {
-      var key = entry.getKey
-      var value = entry.getValue
-      value.close
-
-      val set = reader_cache_files.get(value.file).get
-      set.remove(key)
-      if( set.isEmpty ) {
-        reader_cache_files.remove(value.file)
-      }
+  private val reader_cache = new LRUCache[File, LogReader](100) {
+    protected override def onCacheEviction(entry: Entry[File, LogReader]) = {
+      entry.getValue.release()
     }
   }
 
   def log_info(pos:Long) = log_mutex.synchronized(log_infos.range(0L, pos+1).lastOption.map(_._2))
 
   private def get_reader[T](pos:Long)(func: (LogReader)=>T) = {
-    val info = log_info(pos)
-    info.map { info =>
-      // Checkout a reader from the cache...
-      val (set, reader_id, reader) = reader_cache_files.synchronized {
-        var set = reader_cache_files.getOrElseUpdate(info.file, new HashSet);
-        if( set.isEmpty ) {
-          val reader_id = next_reader_id.getAndIncrement()
-          val reader = new LogReader(info.file, info.position)
-          set.add(reader_id)
-          reader_cache_readers.put(reader_id, reader)
-          (set, reader_id, reader)
+
+    val lookup = log_mutex.synchronized {
+      val info = log_info(pos)
+      info.map { info=>
+        if(info.position == current_appender.start) {
+          current_appender.retain()
+          (info, current_appender)
         } else {
-          val reader_id = set.head
-          set.remove(reader_id)
-          (set, reader_id, reader_cache_readers.get(reader_id))
+          (info, null)
+        }
+      }
+    }
+
+    lookup.map { case (info, appender) =>
+      val reader = if( appender!=null ) {
+        // read from the current appender.
+        appender
+      } else {
+        // Checkout a reader from the cache...
+        reader_cache.synchronized {
+          var reader = reader_cache.get(info.file)
+          if(reader==null) {
+            reader = LogReader(info.file, info.position)
+            reader_cache.put(info.file, reader)
+          }
+          reader.retain()
+          reader
         }
       }
 
       try {
         func(reader)
       } finally {
-        // check him back in..
-        reader_cache_files.synchronized {
-          set.add(reader_id)
-        }
+        reader.release
       }
     }
   }



Mime
View raw message