activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1235398 - /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Date Tue, 24 Jan 2012 18:30:29 GMT
Author: chirino
Date: Tue Jan 24 18:30:29 2012
New Revision: 1235398

URL: http://svn.apache.org/viewvc?rev=1235398&view=rev
Log:
Fixes APLO-132 : Disabled the buffer bypassing optimization for now, was causing 256k stomp-benchmark
scenarios to break.

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1235398&r1=1235397&r2=1235398&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Tue Jan 24 18:30:29 2012
@@ -26,8 +26,8 @@ import java.util.concurrent.atomic.Atomi
 import java.io._
 import org.apache.activemq.apollo.util.FileSupport._
 import org.apache.activemq.apollo.util.{Log, LRUCache}
-import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
-import org.fusesource.hawtbuf.{AbstractVarIntSupport, DataByteArrayInputStream, DataByteArrayOutputStream,
Buffer}
+import org.fusesource.hawtdispatch.BaseRetained
+import org.fusesource.hawtbuf.{DataByteArrayInputStream, DataByteArrayOutputStream, Buffer}
 
 object RecordLog extends Log {
 
@@ -149,7 +149,7 @@ case class RecordLog(directory: File, lo
       val cs: Int = checksum(data)
 //      trace("Writing at: "+record_position+" len: "+data_length+" with checksum: "+cs)
 
-      if( total_length > BYPASS_BUFFER_SIZE ) {
+      if( false && total_length > BYPASS_BUFFER_SIZE ) {
 
         // Write the header and flush..
         write_buffer.writeByte(LOG_HEADER_PREFIX)
@@ -226,13 +226,26 @@ case class RecordLog(directory: File, lo
 
         val record = new Buffer(LOG_HEADER_SIZE+length)
 
+        def record_is_not_changing = {
+          using(open) { fd =>
+            val channel = fd.getChannel
+            val new_record = new Buffer(LOG_HEADER_SIZE+length)
+            channel.read(new_record.toByteBuffer, offset)
+            var same = record == new_record
+            println(same)
+            same
+          }
+        }
+
         if( channel.read(record.toByteBuffer, offset) != record.length ) {
+          assert( record_is_not_changing )
           throw new IOException("short record at position: "+record_position+" in file: "+file+",
offset: "+offset)
         }
 
         val is = new DataByteArrayInputStream(record)
         val prefix = is.readByte()
         if( prefix != LOG_HEADER_PREFIX ) {
+          assert(record_is_not_changing)
           throw new IOException("invalid record at position: "+record_position+" in file:
"+file+", offset: "+offset)
         }
 
@@ -244,6 +257,7 @@ case class RecordLog(directory: File, lo
         // If your reading the whole record we can verify the data checksum
         if( expectedLength == length ) {
           if( expectedChecksum != checksum(data) ) {
+            assert(record_is_not_changing)
             throw new IOException("checksum does not match at position: "+record_position+"
in file: "+file+", offset: "+offset)
           }
         }



Mime
View raw message