Author: chirino Date: Fri Dec 31 01:21:08 2010 New Revision: 1054036 URL: http://svn.apache.org/viewvc?rev=1054036&view=rev Log: initial pass at implementing store import/export. Added: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1054036&r1=1054035&r2=1054036&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original) +++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Fri Dec 31 01:21:08 2010 @@ -25,6 +25,8 @@ import collection.mutable.ListBuffer import org.apache.activemq.apollo.broker.store._ import org.apache.activemq.apollo.util._ import com.sleepycat.je._ +import java.io.{EOFException, InputStream, OutputStream} +import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessageFactory} object BDBClient extends Log /** @@ -384,4 +386,102 @@ class BDBClient(store: BDBStore) extends } } + def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = { + try { + with_ctx { ctx=> + import ctx._ + import PBSupport._ + + streams.using_queue_stream { queue_stream => + queues_db.cursor(tx) { (_, value) => + val record:QueueRecord = value + record.writeFramed(queue_stream) + true + } + } + + streams.using_message_stream { message_stream=> + messages_db.cursor(tx) { (_, value) => + val record:MessageRecord = value + record.writeFramed(message_stream) + true + } + } + + streams.using_queue_entry_stream { queue_entry_stream=> + queues_db.cursor(tx) { (_, value) => + val record:QueueRecord = value + with_entries_db(record.key) { entries_db=> + entries_db.cursor(tx) { (key, value) => + val record:QueueEntryRecord = value + record.writeFramed(queue_entry_stream) + true + } + } + true + } + } + + } + Success(Zilch) + } catch { + case x:Exception=> + Failure(x.getMessage) + } + } + + def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] = { + try { + purge + + def foreach[Buffer] (stream:InputStream, fact:PBMessageFactory[_,_])(func: (Buffer)=>Unit):Unit = { + var done = false + do { + try { + func(fact.parseFramed(stream).asInstanceOf[Buffer]) + } catch { + case x:EOFException => + done = true + } + } while( !done ) + } + + with_ctx { ctx=> + import ctx._ + import PBSupport._ + + streams.using_queue_stream { queue_stream=> + foreach(queue_stream, QueuePB.FACTORY) { pb=> + val record:QueueRecord = pb + queues_db.put(tx, record.key, record) + with_entries_db(record.key) { entriesdb=> + } + } + } + + streams.using_message_stream { message_stream=> + foreach(message_stream, MessagePB.FACTORY) { pb=> + val record:MessageRecord = pb + messages_db.put(tx, record.key, record) + } + } + + streams.using_queue_entry_stream { queue_entry_stream=> + foreach(queue_entry_stream, QueueEntryPB.FACTORY) { pb=> + val record:QueueEntryRecord = pb + + with_entries_db(record.queue_key) { entries_db=> + entries_db.put(tx, record.entry_seq, record) + add_and_get(message_refs_db, record.message_key, 1, tx) + } + } + } + } + Success(Zilch) + + } catch { + case x:Exception=> + Failure(x.getMessage) + } + } } Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1054036&r1=1054035&r2=1054036&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala (original) +++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala Fri Dec 31 01:21:08 2010 @@ -20,7 +20,6 @@ import dto.{BDBStoreDTO, BDBStoreStatusD import java.util.concurrent.atomic.AtomicLong import collection.Seq import org.fusesource.hawtdispatch._ -import java.io.File import java.util.concurrent._ import org.apache.activemq.apollo.broker.store._ import org.apache.activemq.apollo.util._ @@ -28,6 +27,8 @@ import ReporterLevel._ import org.fusesource.hawtdispatch.ListEventAggregator import org.apache.activemq.apollo.dto.{StoreStatusDTO, IntMetricDTO, TimeMetricDTO, StoreDTO} import org.apache.activemq.apollo.util.OptionSupport._ +import java.io.{InputStream, OutputStream, File} +import scala.util.continuations._ /** * @author Hiram Chirino @@ -268,4 +269,21 @@ class BDBStore extends DelayingStoreSupp callback(rc) } + + /** + * Exports the contents of the store to the provided streams. Each stream should contain + * a list of framed protobuf objects with the corresponding object types. + */ + def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable = write_executor ! { + client.export_pb(streams) + } + + /** + * Imports a previously exported set of streams. This deletes any previous data + * in the store. + */ + def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable = write_executor ! { + client.import_pb(streams) + } + } Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1054036&r1=1054035&r2=1054036&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala Fri Dec 31 01:21:08 2010 @@ -18,6 +18,14 @@ package org.apache.activemq.apollo.broke */ import org.apache.activemq.apollo.dto.{StoreStatusDTO, StoreDTO} import org.apache.activemq.apollo.util._ +import java.io.{InputStream, OutputStream} +import scala.util.continuations._ + +trait StreamManager[A] { + def using_queue_stream(func: (A)=>Unit) + def using_message_stream(func: (A)=>Unit) + def using_queue_entry_stream(func: (A)=>Unit) +} /** *

@@ -109,4 +117,15 @@ trait Store extends ServiceTrait { */ def load_message(messageKey:Long)(callback:(Option[MessageRecord])=>Unit ) + /** + * Exports the contents of the store to the provided streams. Each stream should contain + * a list of framed protobuf objects with the corresponding object types. + */ + def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable + + /** + * Imports a previously exported set of streams. This deletes any previous data + * in the store. + */ + def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable } \ No newline at end of file Modified: activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala?rev=1054036&r1=1054035&r2=1054036&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala (original) +++ activemq/activemq-apollo/trunk/apollo-cassandra/src/main/scala/org/apache/activemq/apollo/broker/store/cassandra/CassandraStore.scala Fri Dec 31 01:21:08 2010 @@ -32,6 +32,8 @@ import org.apache.activemq.apollo.broker import org.apache.activemq.apollo.util._ import ReporterLevel._ import org.apache.activemq.apollo.util.OptionSupport._ +import java.io.{InputStream, OutputStream} +import scala.util.continuations._ object CassandraStore extends Log { @@ -229,5 +231,19 @@ class CassandraStore extends DelayingSto } } + /** + * Exports the contents of the store to the provided streams. Each stream should contain + * a list of framed protobuf objects with the corresponding object types. + */ + def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable = blocking ! { + Failure("not supported")// client.export_pb(queue_stream, message_stream, queue_entry_stream) + } + /** + * Imports a previously exported set of streams. This deletes any previous data + * in the store. + */ + def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable = blocking ! { + Failure("not supported")//client.import_pb(queue_stream, message_stream, queue_entry_stream) + } } Added: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala?rev=1054036&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala (added) +++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala Fri Dec 31 01:21:08 2010 @@ -0,0 +1,115 @@ +package org.apache.activemq.apollo.cli.commands + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import org.apache.felix.gogo.commands.{Action, Option => option, Argument => argument, Command => command} +import org.osgi.service.command.CommandSession +import org.apache.activemq.apollo.util.FileSupport._ +import org.apache.activemq.apollo.util.OptionSupport._ +import org.apache.commons.codec.binary.Base64 +import java.net.{HttpURLConnection, URL} +import org.apache.activemq.apollo.broker.{VirtualHost, FileConfigStore} +import org.apache.activemq.apollo.dto.{VirtualHostDTO, WebAdminDTO} +import org.apache.activemq.apollo.util.{ServiceControl, Log, DirectBufferPoolFactory, Logging} +import java.util.zip.{ZipEntry, ZipOutputStream, ZipFile} +import org.apache.activemq.apollo.broker.store.{StreamManager, StoreFactory} +import java.io.{OutputStream, FileOutputStream, File} + +object StoreExport extends Log + +/** + * The apollo stop command + */ +@command(scope="apollo", name = "store-export", description = "exports the contents of a broker message store") +class StoreExport extends Action { + + @option(name = "--conf", description = "The Apollo configuration file.") + var conf: File = _ + + @option(name = "--virtual-host", description = "The id of the virtual host to export, if not specified, the default virtual host is selected.") + var host: String = _ + + @argument(name = "dest", description = "The destination file to hold the exported data", index=0, required=true) + var dest:File = _ + + def execute(session: CommandSession):AnyRef = { + import Helper._ + + try { + + val base = system_dir("apollo.base") + + if( conf == null ) { + conf = base / "etc" / "apollo.xml" + } + + if( !conf.exists ) { + error("Configuration file'%s' does not exist.\n\nTry creating a broker instance using the 'apollo create' command.".format(conf)); + } + + val config_store = new FileConfigStore + config_store.file = conf + config_store.start + val config = config_store.load(true) + + val hosts = collection.JavaConversions.asScalaIterable(config.virtual_hosts).toArray + val vho:Option[VirtualHostDTO] = if( host==null ) { + hosts.headOption + } else { + hosts.filter( _.id == host ).headOption + } + + val vh = vho.getOrElse(error("Could find host to export")) + if( vh.store == null ) { + error("The virtual host '%s' does not have a store configured.".format(vh.id)) + } + + val store = StoreFactory.create(vh.store) + if( store==null ) { + error("Could not create the store.") + } + + store.configure(config.store, LoggingReporter(StoreExport)) + ServiceControl.start(store, "store startup") + + + using( new ZipOutputStream(new FileOutputStream(dest))) { out=> + out.setMethod(ZipEntry.DEFLATED) + out.setLevel(9) + store.export_pb(new StreamManager[OutputStream]() { + def entry(name:String, func: (OutputStream) => Unit) = { + out.putNextEntry(new ZipEntry(name)); + func(out) + out.closeEntry(); + } + def using_queue_stream(func: (OutputStream) => Unit) = entry("queues.dat", func) + def using_queue_entry_stream(func: (OutputStream) => Unit) = entry("queue_entries.dat", func) + def using_message_stream(func: (OutputStream) => Unit) = entry("messages.dat", func) + }) + } + + ServiceControl.stop(store, "store stop"); + + } catch { + case x:Failure=> + error(x.getMessage) + } + null + } + + +} \ No newline at end of file Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala?rev=1054036&r1=1054035&r2=1054036&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala (original) +++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala Fri Dec 31 01:21:08 2010 @@ -30,6 +30,8 @@ import org.apache.activemq.apollo.util._ import ReporterLevel._ import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained, ListEventAggregator} import org.apache.activemq.apollo.util.OptionSupport._ +import java.io.{InputStream, OutputStream} +import scala.util.continuations._ object HawtDBStore extends Log { val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; @@ -286,4 +288,20 @@ class HawtDBStore extends DelayingStoreS callback(rc) } + + /** + * Exports the contents of the store to the provided streams. Each stream should contain + * a list of framed protobuf objects with the corresponding object types. + */ + def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable = executor_pool ! { + Failure("not supported")// client.export_pb(queue_stream, message_stream, queue_entry_stream) + } + + /** + * Imports a previously exported set of streams. This deletes any previous data + * in the store. + */ + def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable = executor_pool ! { + Failure("not supported")//client.import_pb(queue_stream, message_stream, queue_entry_stream) + } } Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1054036&r1=1054035&r2=1054036&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala (original) +++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala Fri Dec 31 01:21:08 2010 @@ -27,9 +27,10 @@ import jdbm._ import btree.BTree import htree.HTree import java.util.Comparator -import java.io.Serializable import jdbm.helper._ import PBSupport._ +import org.fusesource.hawtbuf.proto.PBMessageFactory +import java.io.{EOFException, InputStream, OutputStream, Serializable} object JDBM2Client extends Log { @@ -408,4 +409,106 @@ class JDBM2Client(store: JDBM2Store) { def getLastQueueKey:Long = last_queue_key + + def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = { + try { + import PBSupport._ + + streams.using_queue_stream { queue_stream=> + queues_db.cursor { (_, value) => + val record:QueueRecord = value + record.writeFramed(queue_stream) + true + } + } + streams.using_message_stream { message_stream=> + messages_db.cursor { (_, value) => + val record:MessageRecord = value + record.writeFramed(message_stream) + true + } + } + + streams.using_queue_entry_stream { queue_entry_stream=> + entries_db.cursor { (_, value) => + val record:QueueEntryRecord = value + record.writeFramed(queue_entry_stream) + true + } + } + Success(Zilch) + + } catch { + case x:Exception=> + Failure(x.getMessage) + } + } + + def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] = { + try { + purge + + var size =0 + def check_flush(incr:Int, max:Int) = { + size += incr + if( size > max ) { + recman.commit + size = 0 + } + } + + transaction { + + def foreach[Buffer] (stream:InputStream, fact:PBMessageFactory[_,_])(func: (Buffer)=>Unit):Unit = { + var done = false + do { + try { + func(fact.parseFramed(stream).asInstanceOf[Buffer]) + } catch { + case x:EOFException => + done = true + } + } while( !done ) + } + + + import PBSupport._ + + streams.using_queue_stream { queue_stream=> + foreach(queue_stream, QueuePB.FACTORY) { pb=> + val record:QueueRecord = pb + queues_db.put(record.key, record) + check_flush(1, 10000) + } + } + + recman.commit + + streams.using_message_stream { message_stream=> + foreach(message_stream, MessagePB.FACTORY) { pb=> + val record:MessageRecord = pb + messages_db.put(record.key, record) + check_flush(record.size, 1024*124*10) + } + } + + recman.commit + + streams.using_queue_entry_stream { queue_entry_stream=> + foreach(queue_entry_stream, QueueEntryPB.FACTORY) { pb=> + val record:QueueEntryRecord = pb + entries_db.insert((record.queue_key, record.entry_seq), record, true) + add_message_reference(record.message_key) + check_flush(1, 10000) + } + } + + } + Success(Zilch) + + } catch { + case x:Exception=> + Failure(x.getMessage) + } + } } Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1054036&r1=1054035&r2=1054036&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala (original) +++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala Fri Dec 31 01:21:08 2010 @@ -28,6 +28,7 @@ import ReporterLevel._ import org.fusesource.hawtdispatch.ListEventAggregator import org.apache.activemq.apollo.dto.{StoreStatusDTO, IntMetricDTO, TimeMetricDTO, StoreDTO} import org.apache.activemq.apollo.util.OptionSupport._ +import java.io.{InputStream, OutputStream} import scala.util.continuations._ /** @@ -287,4 +288,20 @@ class JDBM2Store extends DelayingStoreSu callback(rc) } + + /** + * Exports the contents of the store to the provided streams. Each stream should contain + * a list of framed protobuf objects with the corresponding object types. + */ + def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] @suspendable = executor ! { + client.export_pb(streams) + } + + /** + * Imports a previously exported set of streams. This deletes any previous data + * in the store. + */ + def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] @suspendable = executor ! { + client.import_pb(streams) + } }