activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1234369 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/scala/org/apache/activemq/ap...
Date Sat, 21 Jan 2012 17:13:58 GMT
Author: chirino
Date: Sat Jan 21 17:13:57 2012
New Revision: 1234369

URL: http://svn.apache.org/viewvc?rev=1234369&view=rev
Log:
APLO-133 - Fixing LevelDB Store import/export not working

Simplified the import/export store api by removing the use of continuations.
The export/import files now store version info so that an import know what schema was used
to export the data.  Related to APLO-131

Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
    activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1234369&r1=1234368&r2=1234369&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
Sat Jan 21 17:13:57 2012
@@ -31,7 +31,8 @@ import scala.Some
 import java.sql.ClientInfoStatus
 import com.sleepycat.je._
 import javax.management.remote.rmi._RMIConnection_Stub
-import org.fusesource.hawtbuf.Buffer
+import org.apache.activemq.apollo.util.FileSupport._
+import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
 
 object BDBClient extends Log
 /**
@@ -548,8 +549,12 @@ class BDBClient(store: BDBStore) {
     }
   }
 
-  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = {
+  def export_pb(streams:StreamManager[OutputStream]):Option[String] = {
     try {
+      streams.using_version_stream{ stream=>
+        new AsciiBuffer("1").writeTo(stream)
+      }
+
       with_ctx() { ctx=>
         import ctx._
         import PBSupport._
@@ -594,14 +599,25 @@ class BDBClient(store: BDBStore) {
         }
 
       }
-      Success(Zilch)
+      None
     } catch {
       case x:Exception=>
-        Failure(x.getMessage)
+        Some(x.getMessage)
     }
   }
 
-  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] = {
+  def import_pb(streams:StreamManager[InputStream]):Option[String] = {
+    try {
+      streams.using_version_stream {
+        stream =>
+          var ver = read_text(stream).toInt
+          if (ver != 1) {
+            return Some("Cannot import from an export file at version: "+ver)
+          }
+      }
+    } catch {
+      case e => return Some("Could not determine export format version: "+e)
+    }
     try {
       purge
 
@@ -660,11 +676,11 @@ class BDBClient(store: BDBStore) {
           }
         }
       }
-      Success(Zilch)
+      None
 
     } catch {
       case x:Exception=>
-        Failure(x.getMessage)
+        Some(x.getMessage)
     }
   }
 }

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1234369&r1=1234368&r2=1234369&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
Sat Jan 21 17:13:57 2012
@@ -228,16 +228,16 @@ class BDBStore(var config:BDBStoreDTO) e
    * Exports the contents of the store to the provided streams.  Each stream should contain
    * a list of framed protobuf objects with the corresponding object types.
    */
-  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable =
write_executor ! {
-    client.export_pb(streams)
+  def export_pb(streams:StreamManager[OutputStream], cb:(Option[String])=>Unit) = write_executor
{
+    cb(client.export_pb(streams))
   }
 
   /**
    * Imports a previously exported set of streams.  This deletes any previous data
    * in the store.
    */
-  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable = write_executor
! {
-    client.import_pb(streams)
+  def import_pb(streams:StreamManager[InputStream], cb:(Option[String])=>Unit) = write_executor
{
+    cb(client.import_pb(streams))
   }
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala?rev=1234369&r1=1234368&r2=1234369&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala
Sat Jan 21 17:13:57 2012
@@ -18,7 +18,6 @@ package org.apache.activemq.apollo.broke
 
 import dto.BDBStoreDTO
 import org.apache.activemq.apollo.broker.store.{Store, StoreFunSuiteSupport}
-import org.apache.activemq.apollo.util.FileSupport._
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1234369&r1=1234368&r2=1234369&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
Sat Jan 21 17:13:57 2012
@@ -19,17 +19,50 @@ package org.apache.activemq.apollo.broke
 import org.apache.activemq.apollo.dto.StoreStatusDTO
 import org.apache.activemq.apollo.util._
 import java.io.{InputStream, OutputStream}
-import scala.util.continuations._
-import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
+import java.util.concurrent.atomic.AtomicReference
 import org.fusesource.hawtbuf.Buffer
+import java.util.zip.{ZipFile, ZipEntry, ZipOutputStream}
+import FileSupport._
 
 trait StreamManager[A] {
+  def using_version_stream(func: (A)=>Unit)
   def using_map_stream(func: (A)=>Unit)
   def using_queue_stream(func: (A)=>Unit)
   def using_message_stream(func: (A)=>Unit)
   def using_queue_entry_stream(func: (A)=>Unit)
 }
 
+case class ZipOputputStreamManager(out:ZipOutputStream) extends StreamManager[OutputStream]()
{
+  def entry(name:String, func: (OutputStream) => Unit) = {
+    out.putNextEntry(new ZipEntry(name));
+    func(out)
+    out.closeEntry();
+  }
+  def using_version_stream(func: (OutputStream) => Unit) = entry("version.txt", func)
+  def using_queue_stream(func: (OutputStream) => Unit) = entry("queues.dat", func)
+  def using_queue_entry_stream(func: (OutputStream) => Unit) = entry("queue_entries.dat",
func)
+  def using_message_stream(func: (OutputStream) => Unit) = entry("messages.dat", func)
+  def using_map_stream(func: (OutputStream) => Unit) = entry("map.dat", func)
+}
+
+case class ZipInputStreamManager(zip:ZipFile) extends StreamManager[InputStream]() {
+  def entry(name:String, func: (InputStream) => Unit) = {
+    val entry = zip.getEntry(name)
+    if(entry == null) {
+      sys.error("Invalid data file, zip entry not found: "+name);
+    }
+    using(zip.getInputStream(entry)) { is=>
+      func(is)
+    }
+  }
+  def using_version_stream(func: (InputStream) => Unit) = entry("version.txt", func)
+  def using_queue_stream(func: (InputStream) => Unit) = entry("queues.dat", func)
+  def using_queue_entry_stream(func: (InputStream) => Unit) = entry("queue_entries.dat",
func)
+  def using_message_stream(func: (InputStream) => Unit) = entry("messages.dat", func)
+  def using_map_stream(func: (InputStream) => Unit) = entry("map.dat", func)
+}
+
+
 /**
  * <p>
  * The Store is service which offers asynchronous persistence services
@@ -116,11 +149,11 @@ trait Store extends ServiceTrait {
    * Exports the contents of the store to the provided streams.  Each stream should contain
    * a list of framed protobuf objects with the corresponding object types.
    */
-  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable
+  def export_pb(streams:StreamManager[OutputStream], cb:(Option[String])=>Unit):Unit
 
   /**
    * Imports a previously exported set of streams.  This deletes any previous data
    * in the store.
    */
-  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable
+  def import_pb(streams:StreamManager[InputStream], cb:(Option[String])=>Unit):Unit
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1234369&r1=1234368&r2=1234369&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
Sat Jan 21 17:13:57 2012
@@ -22,9 +22,11 @@ import java.util.concurrent.{TimeUnit, C
 import collection.mutable.ListBuffer
 import org.apache.activemq.apollo.util.{LoggingTracker, FunSuiteSupport, LongCounter}
 import org.scalatest.BeforeAndAfterEach
-import java.io.File
 import org.apache.activemq.apollo.util.FileSupport._
 import java.util.concurrent.atomic.AtomicReference
+import java.util.zip.{ZipFile, ZipOutputStream, ZipEntry}
+import java.io._
+import org.apache.activemq.apollo.util.sync_cb
 
 /**
  * <p>Implements generic testing of Store implementations.</p>
@@ -41,20 +43,7 @@ abstract class StoreFunSuiteSupport exte
    * Handy helper to call an async method on the store and wait for
    * the result of the callback.
    */
-  def CB[T](func: (T=>Unit)=>Unit ) = {
-    class X {
-      var value:T = _
-    }
-    val rc = new X
-    val cd = new CountDownLatch(1)
-    def cb(x:T) = {
-      rc.value = x
-      cd.countDown
-    }
-    func(cb)
-    cd.await
-    rc.value
-  }
+
 
   def data_directory = basedir / "target" / "apollo-data"
 
@@ -75,15 +64,20 @@ abstract class StoreFunSuiteSupport exte
   }
 
   override protected def beforeEach() = {
+    purge
+  }
+
+  def purge {
     val tracker = new LoggingTracker("store startup")
     val task = tracker.task("purge")
     store.purge(task.run)
     tracker.await
   }
 
+
   def expectCB[T](expected:T)(func: (T=>Unit)=>Unit ) = {
     expect(expected) {
-      CB(func)
+      sync_cb(func)
     }
   }
 
@@ -91,7 +85,7 @@ abstract class StoreFunSuiteSupport exte
 
   def add_queue(name:String):Long = {
     var queue_a = QueueRecord(queue_key_counter.incrementAndGet, ascii("test"), ascii(name))
-    val rc:Boolean = CB( cb=> store.add_queue(queue_a)(cb) )
+    val rc:Boolean = sync_cb( cb=> store.add_queue(queue_a)(cb) )
     expect(true)(rc)
     queue_a.key
   }
@@ -141,11 +135,62 @@ abstract class StoreFunSuiteSupport exte
     msg_keys
   }
 
+  test("export and import") {
+    val A = add_queue("A")
+    val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
+
+    val rc:Option[MessageRecord] = sync_cb( cb=> store.load_message(msg_keys.head._1,
msg_keys.head._2)(cb) )
+    expect(ascii("message 1").buffer) {
+      rc.get.buffer
+    }
+
+    val file = test_data_dir / "export.zip"
+    file.getParentFile.mkdirs()
+    using( new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(file)))) { out=>
+      val manager = ZipOputputStreamManager(out)
+
+      // Export the data...
+      expect(None) {
+        sync_cb[Option[String]] { cb =>
+          store.export_pb(manager, cb)
+        }
+      }
+    }
+
+    // purge the data..
+    purge
+
+    // There should ne no queues..
+    expectCB(Seq[Long]()) { cb=>
+      store.list_queues(cb)
+    }
+
+    // Import the data..
+    val zip = new ZipFile(file)
+    try {
+      val manager = ZipInputStreamManager(zip)
+      expect(None) {
+        sync_cb[Option[String]] { cb =>
+          store.import_pb(manager, cb)
+        }
+      }
+    } finally {
+      zip.close
+    }
+
+    // The data should be there now again..
+    val queues:Seq[Long] = sync_cb(store.list_queues(_))
+    expect(1)(queues.size)
+    val entries:Seq[QueueEntryRecord] = sync_cb(cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb))
+    expect(3) ( entries.size  )
+
+  }
+
   test("load stored message") {
     val A = add_queue("A")
     val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
 
-    val rc:Option[MessageRecord] = CB( cb=> store.load_message(msg_keys.head._1, msg_keys.head._2)(cb)
)
+    val rc:Option[MessageRecord] = sync_cb( cb=> store.load_message(msg_keys.head._1,
msg_keys.head._2)(cb) )
     expect(ascii("message 1").buffer) {
       rc.get.buffer
     }
@@ -166,7 +211,7 @@ abstract class StoreFunSuiteSupport exte
     val A = add_queue("my queue name")
     populate(A, "message 1"::"message 2"::"message 3"::Nil)
 
-    val rc:Option[QueueRecord] = CB( cb=> store.get_queue(A)(cb) )
+    val rc:Option[QueueRecord] = sync_cb( cb=> store.get_queue(A)(cb) )
     expect(ascii("my queue name")) {
       rc.get.binding_data.ascii
     }
@@ -176,7 +221,7 @@ abstract class StoreFunSuiteSupport exte
     val A = add_queue("A")
     val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
 
-    val rc:Seq[QueueEntryRecord] = CB( cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb)
)
+    val rc:Seq[QueueEntryRecord] = sync_cb( cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb)
)
     expect(msg_keys.toSeq.map(_._1)) {
       rc.map( _.message_key )
     }

Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala?rev=1234369&r1=1234368&r2=1234369&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
Sat Jan 21 17:13:57 2012
@@ -21,11 +21,12 @@ import org.apache.activemq.apollo.util.F
 import org.apache.activemq.apollo.dto.VirtualHostDTO
 import org.apache.activemq.apollo.util._
 import java.util.zip.{ZipEntry, ZipOutputStream}
-import org.apache.activemq.apollo.broker.store.{StreamManager, StoreFactory}
-import java.io.{OutputStream, FileOutputStream, File}
-import scala.util.continuations._
 import org.apache.felix.service.command.CommandSession
 import org.apache.activemq.apollo.broker.ConfigStore
+import java.io._
+import java.util.concurrent.CountDownLatch
+import org.apache.activemq.apollo.broker.store.ZipOputputStreamManager._
+import org.apache.activemq.apollo.broker.store.{ZipOputputStreamManager, StreamManager, StoreFactory}
 
 /**
  * The apollo stop command
@@ -41,8 +42,8 @@ class StoreExport extends Action {
   @option(name = "--virtual-host", description = "The id of the virtual host to export, if
not specified, the default virtual host is selected.")
   var host: String = _
 
-  @argument(name = "dest", description = "The destination file to hold the exported data",
index=0, required=true)
-  var dest:File = _
+  @argument(name = "file", description = "The zip file to hold the exported data", index=0,
required=true)
+  var file:File = _
 
   def execute(session: CommandSession):AnyRef = {
     import Helper._
@@ -79,27 +80,18 @@ class StoreExport extends Action {
       }
 
       ServiceControl.start(store, "store startup")
-      using( new ZipOutputStream(new FileOutputStream(dest))) { out=>
+      session.getConsole.println("Exporting... (this might take a while)")
+      using( new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(file)))) {
out=>
         out.setMethod(ZipEntry.DEFLATED)
         out.setLevel(9)
-        val manager = new StreamManager[OutputStream]() {
-          def entry(name:String, func: (OutputStream) => Unit) = {
-            out.putNextEntry(new ZipEntry(name));
-            func(out)
-            out.closeEntry();
-          }
-          def using_queue_stream(func: (OutputStream) => Unit) = entry("queues.dat", func)
-          def using_queue_entry_stream(func: (OutputStream) => Unit) = entry("queue_entries.dat",
func)
-          def using_message_stream(func: (OutputStream) => Unit) = entry("messages.dat",
func)
-          def using_map_stream(func: (OutputStream) => Unit) = entry("map.dat", func)
-        }
-        reset {
-          val rc = store.export_pb(manager)
-          rc.failure_option.foreach(error _)
-        }
-      }
+        val manager = ZipOputputStreamManager(out)
 
+        sync_cb[Option[String]] { cb =>
+          store.export_pb(manager, cb)
+        }.foreach(error _)
+      }
       ServiceControl.stop(store, "store stop");
+      session.getConsole.println("Done. Export located at: "+file)
 
     } catch {
       case x:Failure=>

Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala?rev=1234369&r1=1234368&r2=1234369&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
Sat Jan 21 17:13:57 2012
@@ -20,12 +20,13 @@ import org.apache.felix.gogo.commands.{A
 import org.apache.activemq.apollo.util.FileSupport._
 import org.apache.activemq.apollo.dto.VirtualHostDTO
 import org.apache.activemq.apollo.util._
-import org.apache.activemq.apollo.broker.store.{StreamManager, StoreFactory}
 import scala.util.continuations._
 import java.util.zip.ZipFile
 import java.io.{InputStream, File}
 import org.apache.felix.service.command.CommandSession
 import org.apache.activemq.apollo.broker.ConfigStore
+import org.apache.activemq.apollo.broker.store.ZipInputStreamManager._
+import org.apache.activemq.apollo.broker.store.{ZipInputStreamManager, StreamManager, StoreFactory}
 
 
 /**
@@ -39,11 +40,11 @@ class StoreImport extends Action {
   @option(name = "--conf", description = "The Apollo configuration file.")
   var conf: File = _
 
-  @option(name = "--virtual-host", description = "The id of the virtual host to export, if
not specified, the default virtual host is selected.")
+  @option(name = "--virtual-host", description = "The id of the virtual host to import into,
if not specified, the default virtual host is selected.")
   var host: String = _
 
-  @argument(name = "dest", description = "The destination file to hold the exported data",
index=0, required=true)
-  var dest:File = _
+  @argument(name = "file", description = "The zip file the contains that data for the import",
index=0, required=true)
+  var file:File = _
 
   def execute(session: CommandSession):AnyRef = {
     import Helper._
@@ -81,32 +82,19 @@ class StoreImport extends Action {
 
       ServiceControl.start(store, "store startup")
 
-      val zip = new ZipFile(dest)
+      session.getConsole.println("Importing: "+file)
+      val zip = new ZipFile(file)
       try {
-        val manager = new StreamManager[InputStream]() {
-          def entry(name:String, func: (InputStream) => Unit) = {
-            val entry = zip.getEntry(name)
-            if(entry == null) {
-              error("Invalid data file, zip entry not found: "+name);
-            }
-            using(zip.getInputStream(entry)) { is=>
-              func(is)
-            }
-          }
-          def using_queue_stream(func: (InputStream) => Unit) = entry("queues.dat", func)
-          def using_queue_entry_stream(func: (InputStream) => Unit) = entry("queue_entries.dat",
func)
-          def using_message_stream(func: (InputStream) => Unit) = entry("messages.dat",
func)
-          def using_map_stream(func: (InputStream) => Unit) = entry("map.dat", func)
-        }
-        reset {
-          val rc = store.import_pb(manager)
-          rc.failure_option.foreach(error _)
-        }
+        val manager = ZipInputStreamManager(zip)
+        sync_cb[Option[String]] { cb =>
+          store.import_pb(manager, cb)
+        }.foreach(error _)
       } finally {
         zip.close
       }
 
       ServiceControl.stop(store, "store stop");
+      session.getConsole.println("Done.")
 
     } catch {
       case x:Failure=>

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1234369&r1=1234368&r2=1234369&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Sat Jan 21 17:13:57 2012
@@ -30,7 +30,6 @@ import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.util.{TreeMap=>ApolloTreeMap}
 import collection.immutable.TreeMap
 import org.fusesource.leveldbjni.internal.Util
-import org.fusesource.hawtbuf.{Buffer, AbstractVarIntSupport}
 import org.apache.activemq.apollo.broker.Broker
 import org.apache.activemq.apollo.util.ProcessSupport._
 import collection.mutable.{HashMap, ListBuffer}
@@ -39,6 +38,7 @@ import org.iq80.leveldb._
 import org.apache.activemq.apollo.broker.store.leveldb.RecordLog.LogInfo
 import org.apache.activemq.apollo.broker.store.PBSupport
 import java.util.concurrent.atomic.AtomicReference
+import org.fusesource.hawtbuf.{AsciiBuffer, Buffer, AbstractVarIntSupport}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -1063,8 +1063,12 @@ class LevelDBClient(store: LevelDBStore)
 //  }
 
 
-  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = {
+  def export_pb(streams:StreamManager[OutputStream]):Option[String] = {
     try {
+      streams.using_version_stream{ stream=>
+        new AsciiBuffer("1").writeTo(stream)
+      }
+
       retry_using_index {
         
         // Delete all the tmp keys..
@@ -1110,23 +1114,29 @@ class LevelDBClient(store: LevelDBStore)
           // Figure out the active log locations..
           streams.using_queue_entry_stream { stream=>
             index.cursor_prefixed(queue_entry_prefix_array, ro) { (_, value) =>
-              write_framed(stream, value)
-              val record = QueueEntryPB.FACTORY.parseUnframed(value)
+              val record = QueueEntryPB.FACTORY.parseUnframed(value).copy()
               val (pos, len) = decode_locator(record.getMessageLocator)
+              record.setMessageKey(pos)
+              write_framed(stream, record.freeze().toUnframedByteArray)
               index.put(encode_key(tmp_prefix, pos), encode_vlong(len))
               true
             }
           }
 
           streams.using_message_stream { stream=>
-            index.cursor_prefixed(Array(tmp_prefix), ro) { (key, value) =>
+            index.cursor_prefixed(Array(tmp_prefix)) { (key, value) =>
               val (_, pos) = decode_long_key(key)
               val len = decode_vlong(value).toInt
               log.read(pos, len).foreach { value =>
                 // Set the message key to be the position in the log.
-                val pb = MessagePB.FACTORY.parseUnframed(value).copy
-                pb.setMessageKey(pos)
-                write_framed(stream, pb.freeze.toUnframedBuffer)
+                val copy = MessagePB.FACTORY.parseUnframed(value).copy
+                copy.setMessageKey(pos)
+                val data = copy.freeze.toUnframedBuffer
+                
+                val check = MessagePB.FACTORY.parseUnframed(data).copy
+                assert(check.getMessageKey == pos)
+                
+                write_framed(stream, data)
               }
               true
             }
@@ -1141,14 +1151,26 @@ class LevelDBClient(store: LevelDBStore)
         }
 
       }
-      Success(Zilch)
+      None
     } catch {
       case x:Exception=>
-        Failure(x.getMessage)
+        Some(x.getMessage)
     }
   }
 
-  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] = {
+  def import_pb(streams:StreamManager[InputStream]):Option[String] = {
+    try {
+      streams.using_version_stream {
+        stream =>
+          var ver = read_text(stream).toInt
+          if (ver != 1) {
+            return Some("Cannot import from an export file at version: "+ver)
+          }
+      }
+    } catch {
+      case e => return Some("Could not determine export format version: "+e)
+    }
+
     try {
       purge
       log_refs.clear()
@@ -1169,6 +1191,7 @@ class LevelDBClient(store: LevelDBStore)
         }
 
         log.appender { appender =>
+                
           streams.using_map_stream { stream=>
             foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb =>
               index.put(encode_key(map_prefix, pb.getKey), pb.getValue.toByteArray)
@@ -1192,14 +1215,17 @@ class LevelDBClient(store: LevelDBStore)
           streams.using_queue_entry_stream { stream=>
             foreach[QueueEntryPB.Buffer](stream, QueueEntryPB.FACTORY) { record=>
               val copy = record.copy();
-              index.get(encode_key(tmp_prefix, record.getMessageKey)).foreach { locator=>
+              var original_msg_key: Long = record.getMessageKey
+              index.get(encode_key(tmp_prefix, original_msg_key)) match {
+                case Some(locator)=>
                 val (pos, len) = decode_locator(locator)
                 copy.setMessageLocator(locator)
                 index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq),
copy.freeze().toUnframedBuffer)
-                
                 log.log_info(pos).foreach { log_info =>
                   log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
                 }
+                case None =>
+                  println("Invalid queue entry, references message that was not in the export:
"+original_msg_key)
               }
             }
           }
@@ -1214,11 +1240,11 @@ class LevelDBClient(store: LevelDBStore)
       }
       
       snapshot_index
-      Success(Zilch)
+      None
 
     } catch {
       case x:Exception=>
-        Failure(x.getMessage)
+        Some(x.getMessage)
     }
   }
 }

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala?rev=1234369&r1=1234368&r2=1234369&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
Sat Jan 21 17:13:57 2012
@@ -290,16 +290,16 @@ class LevelDBStore(val config:LevelDBSto
    * Exports the contents of the store to the provided streams.  Each stream should contain
    * a list of framed protobuf objects with the corresponding object types.
    */
-  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable =
write_executor ! {
-    client.export_pb(streams)
+  def export_pb(streams:StreamManager[OutputStream], cb:(Option[String])=>Unit) = write_executor
{
+    cb(client.export_pb(streams))
   }
 
   /**
    * Imports a previously exported set of streams.  This deletes any previous data
    * in the store.
    */
-  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable = write_executor
! {
-    client.import_pb(streams)
+  def import_pb(streams:StreamManager[InputStream], cb:(Option[String])=>Unit) = write_executor
{
+    cb(client.import_pb(streams))
   }
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala?rev=1234369&r1=1234368&r2=1234369&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala
Sat Jan 21 17:13:57 2012
@@ -18,6 +18,7 @@ package org.apache.activemq.apollo
 
 import org.fusesource.hawtdispatch._
 import org.fusesource.hawtdispatch.Future
+import java.util.concurrent.CountDownLatch
 
 /**
  *
@@ -69,5 +70,16 @@ package object util {
     }
   }
 
+  def sync_cb[T](func: (T=>Unit)=>Unit ) = {
+    var rc:Option[T] = null
+    val cd = new CountDownLatch(1)
+    def cb(x:T) = {
+      rc = Some(x)
+      cd.countDown
+    }
+    func(cb)
+    cd.await
+    rc.get
+  }
 
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala?rev=1234369&r1=1234368&r2=1234369&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala
Sat Jan 21 17:13:57 2012
@@ -50,4 +50,15 @@ class LongCounter(private var value:Long
   }
 
   override def toString() = get().toString
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case obj:LongCounter =>
+        obj.value == value
+      case _ =>
+        false
+    }
+  }
+
+  override def hashCode(): Int = value.hashCode()
 }
\ No newline at end of file



Mime
View raw message