activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1235623 - in /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb: LevelDBClient.scala RecordLog.scala
Date Wed, 25 Jan 2012 03:58:36 GMT
Author: chirino
Date: Wed Jan 25 03:58:36 2012
New Revision: 1235623

URL: http://svn.apache.org/viewvc?rev=1235623&view=rev
Log:
Support using log files > 2GB also fixes Int rollover bug if a log file went longer than
2GB.

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1235623&r1=1235622&r2=1235623&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Wed Jan 25 03:58:36 2012
@@ -216,16 +216,7 @@ class LevelDBClient(store: LevelDBStore)
   }
 
   def log_size = {
-    Option(config.log_size).map(MemoryPropertyEditor.parse(_)).map{size=>
-      if(size == MemoryPropertyEditor.parse("2G")) {
-        Int.MaxValue // which is 2G - 1 (close enough!)
-      } else if(size > Int.MaxValue) {
-        warn("leveldb log_size was configured to be '"+config.log_size+"' but the maximum
supported log size is 2G")
-        Int.MaxValue
-      } else {
-        size.toInt
-      }
-    }.getOrElse(1024 * 1024 * 100)
+    Option(config.log_size).map(MemoryPropertyEditor.parse(_)).getOrElse(1024 * 1024 * 100L)
   }
 
   def start() = {

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1235623&r1=1235622&r2=1235623&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Wed Jan 25 03:58:36 2012
@@ -70,7 +70,7 @@ case class RecordLog(directory: File, lo
 
   directory.mkdirs()
 
-  var logSize = 1024 * 1024 * 100
+  var logSize = 1024 * 1024 * 100L
   var current_appender:LogAppender = _
   var verify_checksums = false
   var sync = false
@@ -122,9 +122,11 @@ case class RecordLog(directory: File, lo
     channel.position(logSize-1)
     channel.write(new Buffer(1).toByteBuffer)
     channel.force(true)
-    channel.position(0)
+    if( sync ) {
+      channel.position(0)
+    }
 
-    val write_buffer = new DataByteArrayOutputStream((BUFFER_SIZE)+BUFFER_SIZE)
+    val write_buffer = new DataByteArrayOutputStream(BUFFER_SIZE+LOG_HEADER_SIZE)
 
     def force = {
       flush
@@ -194,7 +196,7 @@ case class RecordLog(directory: File, lo
       }
     }
 
-    override def check_read_flush(end_offset:Int) = {
+    override def check_read_flush(end_offset:Long) = {
       if( flushed_offset.get() < end_offset )  {
         this.synchronized {
           println("read flush")
@@ -216,10 +218,12 @@ case class RecordLog(directory: File, lo
       fd.close()
     }
 
-    def check_read_flush(end_offset:Int) = {}
+    def check_read_flush(end_offset:Long) = {}
     
     def read(record_position:Long, length:Int) = {
-      val offset = (record_position-position).toInt
+      val offset = record_position-position
+      assert(offset >=0 )
+      
       check_read_flush(offset+LOG_HEADER_SIZE+length)
       
       if(verify_checksums) {
@@ -273,7 +277,7 @@ case class RecordLog(directory: File, lo
     }
 
     def read(record_position:Long) = {
-      val offset = (record_position-position).toInt
+      val offset = record_position-position
       val header = new Buffer(LOG_HEADER_SIZE)
       channel.read(header.toByteBuffer, offset)
       val is = header.bigEndianEditor();
@@ -300,7 +304,7 @@ case class RecordLog(directory: File, lo
     }
 
     def check(record_position:Long):Option[(Long, Option[Long])] = {
-      var offset = (record_position-position).toInt
+      var offset = record_position-position
       val header = new Buffer(LOG_HEADER_SIZE)
       channel.read(header.toByteBuffer, offset)
       val is = header.bigEndianEditor();



Mime
View raw message