activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1054036 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-cassandra/src/main/scala/org/apache/activemq/...
Date Fri, 31 Dec 2010 01:21:08 GMT
Author: chirino
Date: Fri Dec 31 01:21:08 2010
New Revision: 1054036

URL: http://svn.apache.org/viewvc?rev=1054036&view=rev
Log:
initial pass at implementing store import/export.

Added:
    activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
    activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1054036&r1=1054035&r2=1054036&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
Fri Dec 31 01:21:08 2010
@@ -25,6 +25,8 @@ import collection.mutable.ListBuffer
 import org.apache.activemq.apollo.broker.store._
 import org.apache.activemq.apollo.util._
 import com.sleepycat.je._
+import java.io.{EOFException, InputStream, OutputStream}
+import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessageFactory}
 
 object BDBClient extends Log
 /**
@@ -384,4 +386,102 @@ class BDBClient(store: BDBStore) extends
     }
   }
 
+  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = {
+    try {
+      with_ctx { ctx=>
+        import ctx._
+        import PBSupport._
+
+        streams.using_queue_stream { queue_stream =>
+          queues_db.cursor(tx) { (_, value) =>
+            val record:QueueRecord = value
+            record.writeFramed(queue_stream)
+            true
+          }
+        }
+
+        streams.using_message_stream { message_stream=>
+          messages_db.cursor(tx) { (_, value) =>
+            val record:MessageRecord = value
+            record.writeFramed(message_stream)
+            true
+          }
+        }
+
+        streams.using_queue_entry_stream { queue_entry_stream=>
+          queues_db.cursor(tx) { (_, value) =>
+            val record:QueueRecord = value
+            with_entries_db(record.key) { entries_db=>
+              entries_db.cursor(tx) { (key, value) =>
+                val record:QueueEntryRecord = value
+                record.writeFramed(queue_entry_stream)
+                true
+              }
+            }
+            true
+          }
+        }
+
+      }
+      Success(Zilch)
+    } catch {
+      case x:Exception=>
+        Failure(x.getMessage)
+    }
+  }
+
+  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] = {
+    try {
+      purge
+
+      def foreach[Buffer] (stream:InputStream, fact:PBMessageFactory[_,_])(func: (Buffer)=>Unit):Unit
= {
+        var done = false
+        do {
+          try {
+            func(fact.parseFramed(stream).asInstanceOf[Buffer])
+          } catch {
+            case x:EOFException =>
+              done = true
+          }
+        } while( !done )
+      }
+
+      with_ctx { ctx=>
+        import ctx._
+        import PBSupport._
+
+        streams.using_queue_stream { queue_stream=>
+          foreach(queue_stream, QueuePB.FACTORY) { pb=>
+            val record:QueueRecord = pb
+            queues_db.put(tx, record.key, record)
+            with_entries_db(record.key) { entriesdb=>
+            }
+          }
+        }
+
+        streams.using_message_stream { message_stream=>
+          foreach(message_stream, MessagePB.FACTORY) { pb=>
+            val record:MessageRecord = pb
+            messages_db.put(tx, record.key, record)
+          }
+        }
+
+        streams.using_queue_entry_stream { queue_entry_stream=>
+          foreach(queue_entry_stream, QueueEntryPB.FACTORY) { pb=>
+            val record:QueueEntryRecord = pb
+
+            with_entries_db(record.queue_key) { entries_db=>
+              entries_db.put(tx, record.entry_seq, record)
+              add_and_get(message_refs_db, record.message_key, 1, tx)
+            }
+          }
+        }
+      }
+      Success(Zilch)
+
+    } catch {
+      case x:Exception=>
+        Failure(x.getMessage)
+    }
+  }
 }

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1054036&r1=1054035&r2=1054036&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
Fri Dec 31 01:21:08 2010
@@ -20,7 +20,6 @@ import dto.{BDBStoreDTO, BDBStoreStatusD
 import java.util.concurrent.atomic.AtomicLong
 import collection.Seq
 import org.fusesource.hawtdispatch._
-import java.io.File
 import java.util.concurrent._
 import org.apache.activemq.apollo.broker.store._
 import org.apache.activemq.apollo.util._
@@ -28,6 +27,8 @@ import ReporterLevel._
 import org.fusesource.hawtdispatch.ListEventAggregator
 import org.apache.activemq.apollo.dto.{StoreStatusDTO, IntMetricDTO, TimeMetricDTO, StoreDTO}
 import org.apache.activemq.apollo.util.OptionSupport._
+import java.io.{InputStream, OutputStream, File}
+import scala.util.continuations._
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -268,4 +269,21 @@ class BDBStore extends DelayingStoreSupp
 
     callback(rc)
   }
+
+  /**
+   * Exports the contents of the store to the provided streams.  Each stream should contain
+   * a list of framed protobuf objects with the corresponding object types.
+   */
+  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable =
write_executor ! {
+    client.export_pb(streams)
+  }
+
+  /**
+   * Imports a previously exported set of streams.  This deletes any previous data
+   * in the store.
+   */
+  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable = write_executor
! {
+    client.import_pb(streams)
+  }
+
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1054036&r1=1054035&r2=1054036&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
Fri Dec 31 01:21:08 2010
@@ -18,6 +18,14 @@ package org.apache.activemq.apollo.broke
  */
 import org.apache.activemq.apollo.dto.{StoreStatusDTO, StoreDTO}
 import org.apache.activemq.apollo.util._
+import java.io.{InputStream, OutputStream}
+import scala.util.continuations._
+
+trait StreamManager[A] {
+  def using_queue_stream(func: (A)=>Unit)
+  def using_message_stream(func: (A)=>Unit)
+  def using_queue_entry_stream(func: (A)=>Unit)
+}
 
 /**
  * <p>
@@ -109,4 +117,15 @@ trait Store extends ServiceTrait {
    */
   def load_message(messageKey:Long)(callback:(Option[MessageRecord])=>Unit )
 
+  /**
+   * Exports the contents of the store to the provided streams.  Each stream should contain
+   * a list of framed protobuf objects with the corresponding object types.
+   */
+  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable
+
+  /**
+   * Imports a previously exported set of streams.  This deletes any previous data
+   * in the store.
+   */
+  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala?rev=1054036&r1=1054035&r2=1054036&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala
Fri Dec 31 01:21:08 2010
@@ -32,6 +32,8 @@ import org.apache.activemq.apollo.broker
 import org.apache.activemq.apollo.util._
 import ReporterLevel._
 import org.apache.activemq.apollo.util.OptionSupport._
+import java.io.{InputStream, OutputStream}
+import scala.util.continuations._
 
 object CassandraStore extends Log {
 
@@ -229,5 +231,19 @@ class CassandraStore extends DelayingSto
     }
   }
 
+  /**
+   * Exports the contents of the store to the provided streams.  Each stream should contain
+   * a list of framed protobuf objects with the corresponding object types.
+   */
+  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable =
blocking ! {
+    Failure("not supported")// client.export_pb(queue_stream, message_stream, queue_entry_stream)
+  }
 
+  /**
+   * Imports a previously exported set of streams.  This deletes any previous data
+   * in the store.
+   */
+  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable = blocking
! {
+    Failure("not supported")//client.import_pb(queue_stream, message_stream, queue_entry_stream)
+  }
 }

Added: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala?rev=1054036&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
Fri Dec 31 01:21:08 2010
@@ -0,0 +1,115 @@
+package org.apache.activemq.apollo.cli.commands
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.felix.gogo.commands.{Action, Option => option, Argument => argument,
Command => command}
+import org.osgi.service.command.CommandSession
+import org.apache.activemq.apollo.util.FileSupport._
+import org.apache.activemq.apollo.util.OptionSupport._
+import org.apache.commons.codec.binary.Base64
+import java.net.{HttpURLConnection, URL}
+import org.apache.activemq.apollo.broker.{VirtualHost, FileConfigStore}
+import org.apache.activemq.apollo.dto.{VirtualHostDTO, WebAdminDTO}
+import org.apache.activemq.apollo.util.{ServiceControl, Log, DirectBufferPoolFactory, Logging}
+import java.util.zip.{ZipEntry, ZipOutputStream, ZipFile}
+import org.apache.activemq.apollo.broker.store.{StreamManager, StoreFactory}
+import java.io.{OutputStream, FileOutputStream, File}
+
+object StoreExport extends Log
+
+/**
+ * The apollo stop command
+ */
+@command(scope="apollo", name = "store-export", description = "exports the contents of a
broker message store")
+class StoreExport extends Action {
+
+  @option(name = "--conf", description = "The Apollo configuration file.")
+  var conf: File = _
+
+  @option(name = "--virtual-host", description = "The id of the virtual host to export, if
not specified, the default virtual host is selected.")
+  var host: String = _
+
+  @argument(name = "dest", description = "The destination file to hold the exported data",
index=0, required=true)
+  var dest:File = _
+
+  def execute(session: CommandSession):AnyRef = {
+    import Helper._
+
+    try {
+
+      val base = system_dir("apollo.base")
+
+      if( conf == null ) {
+        conf = base / "etc" / "apollo.xml"
+      }
+
+      if( !conf.exists ) {
+        error("Configuration file'%s' does not exist.\n\nTry creating a broker instance using
the 'apollo create' command.".format(conf));
+      }
+
+      val config_store = new FileConfigStore
+      config_store.file = conf
+      config_store.start
+      val config = config_store.load(true)
+
+      val hosts = collection.JavaConversions.asScalaIterable(config.virtual_hosts).toArray
+      val vho:Option[VirtualHostDTO] = if( host==null ) {
+        hosts.headOption
+      } else {
+        hosts.filter( _.id == host ).headOption
+      }
+
+      val vh = vho.getOrElse(error("Could find host to export"))
+      if( vh.store == null ) {
+        error("The virtual host '%s' does not have a store configured.".format(vh.id))
+      }
+
+      val store = StoreFactory.create(vh.store)
+      if( store==null ) {
+        error("Could not create the store.")
+      }
+
+      store.configure(config.store, LoggingReporter(StoreExport))
+      ServiceControl.start(store, "store startup")
+
+
+      using( new ZipOutputStream(new FileOutputStream(dest))) { out=>
+        out.setMethod(ZipEntry.DEFLATED)
+        out.setLevel(9)
+        store.export_pb(new StreamManager[OutputStream]() {
+          def entry(name:String, func: (OutputStream) => Unit) = {
+            out.putNextEntry(new ZipEntry(name));
+            func(out)
+            out.closeEntry();
+          }
+          def using_queue_stream(func: (OutputStream) => Unit) = entry("queues.dat", func)
+          def using_queue_entry_stream(func: (OutputStream) => Unit) = entry("queue_entries.dat",
func)
+          def using_message_stream(func: (OutputStream) => Unit) = entry("messages.dat",
func)
+        })
+      }
+
+      ServiceControl.stop(store, "store stop");
+
+    } catch {
+      case x:Failure=>
+        error(x.getMessage)
+    }
+    null
+  }
+
+
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala?rev=1054036&r1=1054035&r2=1054036&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
Fri Dec 31 01:21:08 2010
@@ -30,6 +30,8 @@ import org.apache.activemq.apollo.util._
 import ReporterLevel._
 import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained, ListEventAggregator}
 import org.apache.activemq.apollo.util.OptionSupport._
+import java.io.{InputStream, OutputStream}
+import scala.util.continuations._
 
 object HawtDBStore extends Log {
   val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -286,4 +288,20 @@ class HawtDBStore extends DelayingStoreS
 
     callback(rc)
   }
+
+  /**
+   * Exports the contents of the store to the provided streams.  Each stream should contain
+   * a list of framed protobuf objects with the corresponding object types.
+   */
+  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable =
executor_pool ! {
+    Failure("not supported")// client.export_pb(queue_stream, message_stream, queue_entry_stream)
+  }
+
+  /**
+   * Imports a previously exported set of streams.  This deletes any previous data
+   * in the store.
+   */
+  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable = executor_pool
! {
+    Failure("not supported")//client.import_pb(queue_stream, message_stream, queue_entry_stream)
+  }
 }

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1054036&r1=1054035&r2=1054036&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
Fri Dec 31 01:21:08 2010
@@ -27,9 +27,10 @@ import jdbm._
 import btree.BTree
 import htree.HTree
 import java.util.Comparator
-import java.io.Serializable
 import jdbm.helper._
 import PBSupport._
+import org.fusesource.hawtbuf.proto.PBMessageFactory
+import java.io.{EOFException, InputStream, OutputStream, Serializable}
 
 object JDBM2Client extends Log {
 
@@ -408,4 +409,106 @@ class JDBM2Client(store: JDBM2Store) {
 
   def getLastQueueKey:Long = last_queue_key
 
+
+  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = {
+    try {
+      import PBSupport._
+
+      streams.using_queue_stream { queue_stream=>
+        queues_db.cursor { (_, value) =>
+          val record:QueueRecord = value
+          record.writeFramed(queue_stream)
+          true
+        }
+      }
+      streams.using_message_stream { message_stream=>
+        messages_db.cursor { (_, value) =>
+          val record:MessageRecord = value
+          record.writeFramed(message_stream)
+          true
+        }
+      }
+
+      streams.using_queue_entry_stream { queue_entry_stream=>
+        entries_db.cursor { (_, value) =>
+          val record:QueueEntryRecord = value
+          record.writeFramed(queue_entry_stream)
+          true
+        }
+      }
+      Success(Zilch)
+
+    } catch {
+      case x:Exception=>
+        Failure(x.getMessage)
+    }
+  }
+
+  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] = {
+    try {
+      purge
+
+      var size =0
+      def check_flush(incr:Int, max:Int) = {
+        size += incr
+        if( size > max ) {
+          recman.commit
+          size = 0
+        }
+      }
+
+      transaction {
+
+        def foreach[Buffer] (stream:InputStream, fact:PBMessageFactory[_,_])(func: (Buffer)=>Unit):Unit
= {
+          var done = false
+          do {
+            try {
+              func(fact.parseFramed(stream).asInstanceOf[Buffer])
+            } catch {
+              case x:EOFException =>
+                done = true
+            }
+          } while( !done )
+        }
+
+
+        import PBSupport._
+
+        streams.using_queue_stream { queue_stream=>
+          foreach(queue_stream, QueuePB.FACTORY) { pb=>
+            val record:QueueRecord = pb
+            queues_db.put(record.key, record)
+            check_flush(1, 10000)
+          }
+        }
+
+        recman.commit
+
+        streams.using_message_stream { message_stream=>
+          foreach(message_stream, MessagePB.FACTORY) { pb=>
+            val record:MessageRecord = pb
+            messages_db.put(record.key, record)
+            check_flush(record.size, 1024*124*10)
+          }
+        }
+
+        recman.commit
+
+        streams.using_queue_entry_stream { queue_entry_stream=>
+          foreach(queue_entry_stream, QueueEntryPB.FACTORY) { pb=>
+            val record:QueueEntryRecord = pb
+            entries_db.insert((record.queue_key, record.entry_seq), record, true)
+            add_message_reference(record.message_key)
+            check_flush(1, 10000)
+          }
+        }
+
+      }
+      Success(Zilch)
+
+    } catch {
+      case x:Exception=>
+        Failure(x.getMessage)
+    }
+  }
 }

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1054036&r1=1054035&r2=1054036&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
Fri Dec 31 01:21:08 2010
@@ -28,6 +28,7 @@ import ReporterLevel._
 import org.fusesource.hawtdispatch.ListEventAggregator
 import org.apache.activemq.apollo.dto.{StoreStatusDTO, IntMetricDTO, TimeMetricDTO, StoreDTO}
 import org.apache.activemq.apollo.util.OptionSupport._
+import java.io.{InputStream, OutputStream}
 import scala.util.continuations._
 
 /**
@@ -287,4 +288,20 @@ class JDBM2Store extends DelayingStoreSu
 
     callback(rc)
   }
+
+  /**
+   * Exports the contents of the store to the provided streams.  Each stream should contain
+   * a list of framed protobuf objects with the corresponding object types.
+   */
+  def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable =
executor ! {
+    client.export_pb(streams)
+  }
+
+  /**
+   * Imports a previously exported set of streams.  This deletes any previous data
+   * in the store.
+   */
+  def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable = executor
! {
+    client.import_pb(streams)
+  }
 }



Mime
View raw message