Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A11B1969E for ; Sat, 21 Jan 2012 17:14:21 +0000 (UTC) Received: (qmail 133 invoked by uid 500); 21 Jan 2012 17:14:21 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 99981 invoked by uid 500); 21 Jan 2012 17:14:21 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 99971 invoked by uid 99); 21 Jan 2012 17:14:20 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 21 Jan 2012 17:14:20 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 21 Jan 2012 17:14:18 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9B3B12388993 for ; Sat, 21 Jan 2012 17:13:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1234369 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/scala/org/apache/activemq/ap... Date: Sat, 21 Jan 2012 17:13:58 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120121171358.9B3B12388993@eris.apache.org> Author: chirino Date: Sat Jan 21 17:13:57 2012 New Revision: 1234369 URL: http://svn.apache.org/viewvc?rev=1234369&view=rev Log: APLO-133 - Fixing LevelDB Store import/export not working Simplified the import/export store api by removing the use of continuations. The export/import files now store version info so that an import know what schema was used to export the data. Related to APLO-131 Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1234369&r1=1234368&r2=1234369&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original) +++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Sat Jan 21 17:13:57 2012 @@ -31,7 +31,8 @@ import scala.Some import java.sql.ClientInfoStatus import com.sleepycat.je._ import javax.management.remote.rmi._RMIConnection_Stub -import org.fusesource.hawtbuf.Buffer +import org.apache.activemq.apollo.util.FileSupport._ +import org.fusesource.hawtbuf.{AsciiBuffer, Buffer} object BDBClient extends Log /** @@ -548,8 +549,12 @@ class BDBClient(store: BDBStore) { } } - def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = { + def export_pb(streams:StreamManager[OutputStream]):Option[String] = { try { + streams.using_version_stream{ stream=> + new AsciiBuffer("1").writeTo(stream) + } + with_ctx() { ctx=> import ctx._ import PBSupport._ @@ -594,14 +599,25 @@ class BDBClient(store: BDBStore) { } } - Success(Zilch) + None } catch { case x:Exception=> - Failure(x.getMessage) + Some(x.getMessage) } } - def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] = { + def import_pb(streams:StreamManager[InputStream]):Option[String] = { + try { + streams.using_version_stream { + stream => + var ver = read_text(stream).toInt + if (ver != 1) { + return Some("Cannot import from an export file at version: "+ver) + } + } + } catch { + case e => return Some("Could not determine export format version: "+e) + } try { purge @@ -660,11 +676,11 @@ class BDBClient(store: BDBStore) { } } } - Success(Zilch) + None } catch { case x:Exception=> - Failure(x.getMessage) + Some(x.getMessage) } } } Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1234369&r1=1234368&r2=1234369&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala (original) +++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala Sat Jan 21 17:13:57 2012 @@ -228,16 +228,16 @@ class BDBStore(var config:BDBStoreDTO) e * Exports the contents of the store to the provided streams. Each stream should contain * a list of framed protobuf objects with the corresponding object types. */ - def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable = write_executor ! { - client.export_pb(streams) + def export_pb(streams:StreamManager[OutputStream], cb:(Option[String])=>Unit) = write_executor { + cb(client.export_pb(streams)) } /** * Imports a previously exported set of streams. This deletes any previous data * in the store. */ - def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable = write_executor ! { - client.import_pb(streams) + def import_pb(streams:StreamManager[InputStream], cb:(Option[String])=>Unit) = write_executor { + cb(client.import_pb(streams)) } } Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala?rev=1234369&r1=1234368&r2=1234369&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala (original) +++ activemq/activemq-apollo/trunk/apollo-bdb/src/test/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStoreTest.scala Sat Jan 21 17:13:57 2012 @@ -18,7 +18,6 @@ package org.apache.activemq.apollo.broke import dto.BDBStoreDTO import org.apache.activemq.apollo.broker.store.{Store, StoreFunSuiteSupport} -import org.apache.activemq.apollo.util.FileSupport._ /** * @author Hiram Chirino Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1234369&r1=1234368&r2=1234369&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala Sat Jan 21 17:13:57 2012 @@ -19,17 +19,50 @@ package org.apache.activemq.apollo.broke import org.apache.activemq.apollo.dto.StoreStatusDTO import org.apache.activemq.apollo.util._ import java.io.{InputStream, OutputStream} -import scala.util.continuations._ -import java.util.concurrent.atomic.{AtomicReference, AtomicLong} +import java.util.concurrent.atomic.AtomicReference import org.fusesource.hawtbuf.Buffer +import java.util.zip.{ZipFile, ZipEntry, ZipOutputStream} +import FileSupport._ trait StreamManager[A] { + def using_version_stream(func: (A)=>Unit) def using_map_stream(func: (A)=>Unit) def using_queue_stream(func: (A)=>Unit) def using_message_stream(func: (A)=>Unit) def using_queue_entry_stream(func: (A)=>Unit) } +case class ZipOputputStreamManager(out:ZipOutputStream) extends StreamManager[OutputStream]() { + def entry(name:String, func: (OutputStream) => Unit) = { + out.putNextEntry(new ZipEntry(name)); + func(out) + out.closeEntry(); + } + def using_version_stream(func: (OutputStream) => Unit) = entry("version.txt", func) + def using_queue_stream(func: (OutputStream) => Unit) = entry("queues.dat", func) + def using_queue_entry_stream(func: (OutputStream) => Unit) = entry("queue_entries.dat", func) + def using_message_stream(func: (OutputStream) => Unit) = entry("messages.dat", func) + def using_map_stream(func: (OutputStream) => Unit) = entry("map.dat", func) +} + +case class ZipInputStreamManager(zip:ZipFile) extends StreamManager[InputStream]() { + def entry(name:String, func: (InputStream) => Unit) = { + val entry = zip.getEntry(name) + if(entry == null) { + sys.error("Invalid data file, zip entry not found: "+name); + } + using(zip.getInputStream(entry)) { is=> + func(is) + } + } + def using_version_stream(func: (InputStream) => Unit) = entry("version.txt", func) + def using_queue_stream(func: (InputStream) => Unit) = entry("queues.dat", func) + def using_queue_entry_stream(func: (InputStream) => Unit) = entry("queue_entries.dat", func) + def using_message_stream(func: (InputStream) => Unit) = entry("messages.dat", func) + def using_map_stream(func: (InputStream) => Unit) = entry("map.dat", func) +} + + /** *

* The Store is service which offers asynchronous persistence services @@ -116,11 +149,11 @@ trait Store extends ServiceTrait { * Exports the contents of the store to the provided streams. Each stream should contain * a list of framed protobuf objects with the corresponding object types. */ - def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable + def export_pb(streams:StreamManager[OutputStream], cb:(Option[String])=>Unit):Unit /** * Imports a previously exported set of streams. This deletes any previous data * in the store. */ - def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable + def import_pb(streams:StreamManager[InputStream], cb:(Option[String])=>Unit):Unit } \ No newline at end of file Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1234369&r1=1234368&r2=1234369&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Sat Jan 21 17:13:57 2012 @@ -22,9 +22,11 @@ import java.util.concurrent.{TimeUnit, C import collection.mutable.ListBuffer import org.apache.activemq.apollo.util.{LoggingTracker, FunSuiteSupport, LongCounter} import org.scalatest.BeforeAndAfterEach -import java.io.File import org.apache.activemq.apollo.util.FileSupport._ import java.util.concurrent.atomic.AtomicReference +import java.util.zip.{ZipFile, ZipOutputStream, ZipEntry} +import java.io._ +import org.apache.activemq.apollo.util.sync_cb /** *

Implements generic testing of Store implementations.

@@ -41,20 +43,7 @@ abstract class StoreFunSuiteSupport exte * Handy helper to call an async method on the store and wait for * the result of the callback. */ - def CB[T](func: (T=>Unit)=>Unit ) = { - class X { - var value:T = _ - } - val rc = new X - val cd = new CountDownLatch(1) - def cb(x:T) = { - rc.value = x - cd.countDown - } - func(cb) - cd.await - rc.value - } + def data_directory = basedir / "target" / "apollo-data" @@ -75,15 +64,20 @@ abstract class StoreFunSuiteSupport exte } override protected def beforeEach() = { + purge + } + + def purge { val tracker = new LoggingTracker("store startup") val task = tracker.task("purge") store.purge(task.run) tracker.await } + def expectCB[T](expected:T)(func: (T=>Unit)=>Unit ) = { expect(expected) { - CB(func) + sync_cb(func) } } @@ -91,7 +85,7 @@ abstract class StoreFunSuiteSupport exte def add_queue(name:String):Long = { var queue_a = QueueRecord(queue_key_counter.incrementAndGet, ascii("test"), ascii(name)) - val rc:Boolean = CB( cb=> store.add_queue(queue_a)(cb) ) + val rc:Boolean = sync_cb( cb=> store.add_queue(queue_a)(cb) ) expect(true)(rc) queue_a.key } @@ -141,11 +135,62 @@ abstract class StoreFunSuiteSupport exte msg_keys } + test("export and import") { + val A = add_queue("A") + val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil) + + val rc:Option[MessageRecord] = sync_cb( cb=> store.load_message(msg_keys.head._1, msg_keys.head._2)(cb) ) + expect(ascii("message 1").buffer) { + rc.get.buffer + } + + val file = test_data_dir / "export.zip" + file.getParentFile.mkdirs() + using( new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(file)))) { out=> + val manager = ZipOputputStreamManager(out) + + // Export the data... + expect(None) { + sync_cb[Option[String]] { cb => + store.export_pb(manager, cb) + } + } + } + + // purge the data.. + purge + + // There should ne no queues.. + expectCB(Seq[Long]()) { cb=> + store.list_queues(cb) + } + + // Import the data.. + val zip = new ZipFile(file) + try { + val manager = ZipInputStreamManager(zip) + expect(None) { + sync_cb[Option[String]] { cb => + store.import_pb(manager, cb) + } + } + } finally { + zip.close + } + + // The data should be there now again.. + val queues:Seq[Long] = sync_cb(store.list_queues(_)) + expect(1)(queues.size) + val entries:Seq[QueueEntryRecord] = sync_cb(cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb)) + expect(3) ( entries.size ) + + } + test("load stored message") { val A = add_queue("A") val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil) - val rc:Option[MessageRecord] = CB( cb=> store.load_message(msg_keys.head._1, msg_keys.head._2)(cb) ) + val rc:Option[MessageRecord] = sync_cb( cb=> store.load_message(msg_keys.head._1, msg_keys.head._2)(cb) ) expect(ascii("message 1").buffer) { rc.get.buffer } @@ -166,7 +211,7 @@ abstract class StoreFunSuiteSupport exte val A = add_queue("my queue name") populate(A, "message 1"::"message 2"::"message 3"::Nil) - val rc:Option[QueueRecord] = CB( cb=> store.get_queue(A)(cb) ) + val rc:Option[QueueRecord] = sync_cb( cb=> store.get_queue(A)(cb) ) expect(ascii("my queue name")) { rc.get.binding_data.ascii } @@ -176,7 +221,7 @@ abstract class StoreFunSuiteSupport exte val A = add_queue("A") val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil) - val rc:Seq[QueueEntryRecord] = CB( cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb) ) + val rc:Seq[QueueEntryRecord] = sync_cb( cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb) ) expect(msg_keys.toSeq.map(_._1)) { rc.map( _.message_key ) } Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala?rev=1234369&r1=1234368&r2=1234369&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala (original) +++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala Sat Jan 21 17:13:57 2012 @@ -21,11 +21,12 @@ import org.apache.activemq.apollo.util.F import org.apache.activemq.apollo.dto.VirtualHostDTO import org.apache.activemq.apollo.util._ import java.util.zip.{ZipEntry, ZipOutputStream} -import org.apache.activemq.apollo.broker.store.{StreamManager, StoreFactory} -import java.io.{OutputStream, FileOutputStream, File} -import scala.util.continuations._ import org.apache.felix.service.command.CommandSession import org.apache.activemq.apollo.broker.ConfigStore +import java.io._ +import java.util.concurrent.CountDownLatch +import org.apache.activemq.apollo.broker.store.ZipOputputStreamManager._ +import org.apache.activemq.apollo.broker.store.{ZipOputputStreamManager, StreamManager, StoreFactory} /** * The apollo stop command @@ -41,8 +42,8 @@ class StoreExport extends Action { @option(name = "--virtual-host", description = "The id of the virtual host to export, if not specified, the default virtual host is selected.") var host: String = _ - @argument(name = "dest", description = "The destination file to hold the exported data", index=0, required=true) - var dest:File = _ + @argument(name = "file", description = "The zip file to hold the exported data", index=0, required=true) + var file:File = _ def execute(session: CommandSession):AnyRef = { import Helper._ @@ -79,27 +80,18 @@ class StoreExport extends Action { } ServiceControl.start(store, "store startup") - using( new ZipOutputStream(new FileOutputStream(dest))) { out=> + session.getConsole.println("Exporting... (this might take a while)") + using( new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(file)))) { out=> out.setMethod(ZipEntry.DEFLATED) out.setLevel(9) - val manager = new StreamManager[OutputStream]() { - def entry(name:String, func: (OutputStream) => Unit) = { - out.putNextEntry(new ZipEntry(name)); - func(out) - out.closeEntry(); - } - def using_queue_stream(func: (OutputStream) => Unit) = entry("queues.dat", func) - def using_queue_entry_stream(func: (OutputStream) => Unit) = entry("queue_entries.dat", func) - def using_message_stream(func: (OutputStream) => Unit) = entry("messages.dat", func) - def using_map_stream(func: (OutputStream) => Unit) = entry("map.dat", func) - } - reset { - val rc = store.export_pb(manager) - rc.failure_option.foreach(error _) - } - } + val manager = ZipOputputStreamManager(out) + sync_cb[Option[String]] { cb => + store.export_pb(manager, cb) + }.foreach(error _) + } ServiceControl.stop(store, "store stop"); + session.getConsole.println("Done. Export located at: "+file) } catch { case x:Failure=> Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala?rev=1234369&r1=1234368&r2=1234369&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala (original) +++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala Sat Jan 21 17:13:57 2012 @@ -20,12 +20,13 @@ import org.apache.felix.gogo.commands.{A import org.apache.activemq.apollo.util.FileSupport._ import org.apache.activemq.apollo.dto.VirtualHostDTO import org.apache.activemq.apollo.util._ -import org.apache.activemq.apollo.broker.store.{StreamManager, StoreFactory} import scala.util.continuations._ import java.util.zip.ZipFile import java.io.{InputStream, File} import org.apache.felix.service.command.CommandSession import org.apache.activemq.apollo.broker.ConfigStore +import org.apache.activemq.apollo.broker.store.ZipInputStreamManager._ +import org.apache.activemq.apollo.broker.store.{ZipInputStreamManager, StreamManager, StoreFactory} /** @@ -39,11 +40,11 @@ class StoreImport extends Action { @option(name = "--conf", description = "The Apollo configuration file.") var conf: File = _ - @option(name = "--virtual-host", description = "The id of the virtual host to export, if not specified, the default virtual host is selected.") + @option(name = "--virtual-host", description = "The id of the virtual host to import into, if not specified, the default virtual host is selected.") var host: String = _ - @argument(name = "dest", description = "The destination file to hold the exported data", index=0, required=true) - var dest:File = _ + @argument(name = "file", description = "The zip file the contains that data for the import", index=0, required=true) + var file:File = _ def execute(session: CommandSession):AnyRef = { import Helper._ @@ -81,32 +82,19 @@ class StoreImport extends Action { ServiceControl.start(store, "store startup") - val zip = new ZipFile(dest) + session.getConsole.println("Importing: "+file) + val zip = new ZipFile(file) try { - val manager = new StreamManager[InputStream]() { - def entry(name:String, func: (InputStream) => Unit) = { - val entry = zip.getEntry(name) - if(entry == null) { - error("Invalid data file, zip entry not found: "+name); - } - using(zip.getInputStream(entry)) { is=> - func(is) - } - } - def using_queue_stream(func: (InputStream) => Unit) = entry("queues.dat", func) - def using_queue_entry_stream(func: (InputStream) => Unit) = entry("queue_entries.dat", func) - def using_message_stream(func: (InputStream) => Unit) = entry("messages.dat", func) - def using_map_stream(func: (InputStream) => Unit) = entry("map.dat", func) - } - reset { - val rc = store.import_pb(manager) - rc.failure_option.foreach(error _) - } + val manager = ZipInputStreamManager(zip) + sync_cb[Option[String]] { cb => + store.import_pb(manager, cb) + }.foreach(error _) } finally { zip.close } ServiceControl.stop(store, "store stop"); + session.getConsole.println("Done.") } catch { case x:Failure=> Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1234369&r1=1234368&r2=1234369&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original) +++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Sat Jan 21 17:13:57 2012 @@ -30,7 +30,6 @@ import org.fusesource.hawtdispatch._ import org.apache.activemq.apollo.util.{TreeMap=>ApolloTreeMap} import collection.immutable.TreeMap import org.fusesource.leveldbjni.internal.Util -import org.fusesource.hawtbuf.{Buffer, AbstractVarIntSupport} import org.apache.activemq.apollo.broker.Broker import org.apache.activemq.apollo.util.ProcessSupport._ import collection.mutable.{HashMap, ListBuffer} @@ -39,6 +38,7 @@ import org.iq80.leveldb._ import org.apache.activemq.apollo.broker.store.leveldb.RecordLog.LogInfo import org.apache.activemq.apollo.broker.store.PBSupport import java.util.concurrent.atomic.AtomicReference +import org.fusesource.hawtbuf.{AsciiBuffer, Buffer, AbstractVarIntSupport} /** * @author Hiram Chirino @@ -1063,8 +1063,12 @@ class LevelDBClient(store: LevelDBStore) // } - def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = { + def export_pb(streams:StreamManager[OutputStream]):Option[String] = { try { + streams.using_version_stream{ stream=> + new AsciiBuffer("1").writeTo(stream) + } + retry_using_index { // Delete all the tmp keys.. @@ -1110,23 +1114,29 @@ class LevelDBClient(store: LevelDBStore) // Figure out the active log locations.. streams.using_queue_entry_stream { stream=> index.cursor_prefixed(queue_entry_prefix_array, ro) { (_, value) => - write_framed(stream, value) - val record = QueueEntryPB.FACTORY.parseUnframed(value) + val record = QueueEntryPB.FACTORY.parseUnframed(value).copy() val (pos, len) = decode_locator(record.getMessageLocator) + record.setMessageKey(pos) + write_framed(stream, record.freeze().toUnframedByteArray) index.put(encode_key(tmp_prefix, pos), encode_vlong(len)) true } } streams.using_message_stream { stream=> - index.cursor_prefixed(Array(tmp_prefix), ro) { (key, value) => + index.cursor_prefixed(Array(tmp_prefix)) { (key, value) => val (_, pos) = decode_long_key(key) val len = decode_vlong(value).toInt log.read(pos, len).foreach { value => // Set the message key to be the position in the log. - val pb = MessagePB.FACTORY.parseUnframed(value).copy - pb.setMessageKey(pos) - write_framed(stream, pb.freeze.toUnframedBuffer) + val copy = MessagePB.FACTORY.parseUnframed(value).copy + copy.setMessageKey(pos) + val data = copy.freeze.toUnframedBuffer + + val check = MessagePB.FACTORY.parseUnframed(data).copy + assert(check.getMessageKey == pos) + + write_framed(stream, data) } true } @@ -1141,14 +1151,26 @@ class LevelDBClient(store: LevelDBStore) } } - Success(Zilch) + None } catch { case x:Exception=> - Failure(x.getMessage) + Some(x.getMessage) } } - def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] = { + def import_pb(streams:StreamManager[InputStream]):Option[String] = { + try { + streams.using_version_stream { + stream => + var ver = read_text(stream).toInt + if (ver != 1) { + return Some("Cannot import from an export file at version: "+ver) + } + } + } catch { + case e => return Some("Could not determine export format version: "+e) + } + try { purge log_refs.clear() @@ -1169,6 +1191,7 @@ class LevelDBClient(store: LevelDBStore) } log.appender { appender => + streams.using_map_stream { stream=> foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb => index.put(encode_key(map_prefix, pb.getKey), pb.getValue.toByteArray) @@ -1192,14 +1215,17 @@ class LevelDBClient(store: LevelDBStore) streams.using_queue_entry_stream { stream=> foreach[QueueEntryPB.Buffer](stream, QueueEntryPB.FACTORY) { record=> val copy = record.copy(); - index.get(encode_key(tmp_prefix, record.getMessageKey)).foreach { locator=> + var original_msg_key: Long = record.getMessageKey + index.get(encode_key(tmp_prefix, original_msg_key)) match { + case Some(locator)=> val (pos, len) = decode_locator(locator) copy.setMessageLocator(locator) index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), copy.freeze().toUnframedBuffer) - log.log_info(pos).foreach { log_info => log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet() } + case None => + println("Invalid queue entry, references message that was not in the export: "+original_msg_key) } } } @@ -1214,11 +1240,11 @@ class LevelDBClient(store: LevelDBStore) } snapshot_index - Success(Zilch) + None } catch { case x:Exception=> - Failure(x.getMessage) + Some(x.getMessage) } } } Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala?rev=1234369&r1=1234368&r2=1234369&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala (original) +++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala Sat Jan 21 17:13:57 2012 @@ -290,16 +290,16 @@ class LevelDBStore(val config:LevelDBSto * Exports the contents of the store to the provided streams. Each stream should contain * a list of framed protobuf objects with the corresponding object types. */ - def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable = write_executor ! { - client.export_pb(streams) + def export_pb(streams:StreamManager[OutputStream], cb:(Option[String])=>Unit) = write_executor { + cb(client.export_pb(streams)) } /** * Imports a previously exported set of streams. This deletes any previous data * in the store. */ - def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable = write_executor ! { - client.import_pb(streams) + def import_pb(streams:StreamManager[InputStream], cb:(Option[String])=>Unit) = write_executor { + cb(client.import_pb(streams)) } } Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala?rev=1234369&r1=1234368&r2=1234369&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala (original) +++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala Sat Jan 21 17:13:57 2012 @@ -18,6 +18,7 @@ package org.apache.activemq.apollo import org.fusesource.hawtdispatch._ import org.fusesource.hawtdispatch.Future +import java.util.concurrent.CountDownLatch /** * @@ -69,5 +70,16 @@ package object util { } } + def sync_cb[T](func: (T=>Unit)=>Unit ) = { + var rc:Option[T] = null + val cd = new CountDownLatch(1) + def cb(x:T) = { + rc = Some(x) + cd.countDown + } + func(cb) + cd.await + rc.get + } } \ No newline at end of file Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala?rev=1234369&r1=1234368&r2=1234369&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala (original) +++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala Sat Jan 21 17:13:57 2012 @@ -50,4 +50,15 @@ class LongCounter(private var value:Long } override def toString() = get().toString + + override def equals(obj: Any): Boolean = { + obj match { + case obj:LongCounter => + obj.value == value + case _ => + false + } + } + + override def hashCode(): Int = value.hashCode() } \ No newline at end of file