activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1054038 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-cli/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-cli/src/main/scala/org/apache/activemq/...
Date Fri, 31 Dec 2010 01:21:29 GMT
Author: chirino
Date: Fri Dec 31 01:21:29 2010
New Revision: 1054038

URL: http://svn.apache.org/viewvc?rev=1054038&view=rev
Log:
store-import store-export commands are now working for the bdb and jdbm2 stores.

Added:
    activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
      - copied, changed from r1054037, activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
    activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/META-INF/services/org.apache.activemq.apollo/commands.index
    activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/Stop.scala
    activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1054038&r1=1054037&r2=1054038&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
Fri Dec 31 01:21:29 2010
@@ -451,7 +451,7 @@ class BDBClient(store: BDBStore) extends
         import PBSupport._
 
         streams.using_queue_stream { queue_stream=>
-          foreach(queue_stream, QueuePB.FACTORY) { pb=>
+          foreach[QueuePB.Buffer](queue_stream, QueuePB.FACTORY) { pb=>
             val record:QueueRecord = pb
             queues_db.put(tx, record.key, record)
             with_entries_db(record.key) { entriesdb=>
@@ -460,14 +460,14 @@ class BDBClient(store: BDBStore) extends
         }
 
         streams.using_message_stream { message_stream=>
-          foreach(message_stream, MessagePB.FACTORY) { pb=>
+          foreach[MessagePB.Buffer](message_stream, MessagePB.FACTORY) { pb=>
             val record:MessageRecord = pb
             messages_db.put(tx, record.key, record)
           }
         }
 
         streams.using_queue_entry_stream { queue_entry_stream=>
-          foreach(queue_entry_stream, QueueEntryPB.FACTORY) { pb=>
+          foreach[QueueEntryPB.Buffer](queue_entry_stream, QueueEntryPB.FACTORY) { pb=>
             val record:QueueEntryRecord = pb
 
             with_entries_db(record.queue_key) { entries_db=>

Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/META-INF/services/org.apache.activemq.apollo/commands.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/META-INF/services/org.apache.activemq.apollo/commands.index?rev=1054038&r1=1054037&r2=1054038&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/META-INF/services/org.apache.activemq.apollo/commands.index
(original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/resources/META-INF/services/org.apache.activemq.apollo/commands.index
Fri Dec 31 01:21:29 2010
@@ -22,3 +22,5 @@ org.apache.activemq.apollo.cli.commands.
 org.apache.activemq.apollo.cli.commands.Run
 org.apache.activemq.apollo.cli.commands.Encrypt
 org.apache.activemq.apollo.cli.commands.Decrypt
+org.apache.activemq.apollo.cli.commands.StoreExport
+org.apache.activemq.apollo.cli.commands.StoreImport

Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/Stop.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/Stop.scala?rev=1054038&r1=1054037&r2=1054038&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/Stop.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/Stop.scala
Fri Dec 31 01:21:29 2010
@@ -56,11 +56,7 @@ class Stop extends Action with Logging {
         error("Configuration file'%s' does not exist.\n\nTry creating a broker instance using
the 'apollo create' command.".format(conf));
       }
 
-      val store = new FileConfigStore
-      store.file = conf
-      store.start
-      val config = store.load(true)
-
+      val config = new FileConfigStore(conf).load(true)
 
       val web_admin = config.web_admin.getOrElse(new WebAdminDTO)
       if( web_admin.enabled.getOrElse(true) ) {

Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala?rev=1054038&r1=1054037&r2=1054038&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
Fri Dec 31 01:21:29 2010
@@ -19,17 +19,13 @@ package org.apache.activemq.apollo.cli.c
 import org.apache.felix.gogo.commands.{Action, Option => option, Argument => argument,
Command => command}
 import org.osgi.service.command.CommandSession
 import org.apache.activemq.apollo.util.FileSupport._
-import org.apache.activemq.apollo.util.OptionSupport._
-import org.apache.commons.codec.binary.Base64
-import java.net.{HttpURLConnection, URL}
-import org.apache.activemq.apollo.broker.{VirtualHost, FileConfigStore}
-import org.apache.activemq.apollo.dto.{VirtualHostDTO, WebAdminDTO}
-import org.apache.activemq.apollo.util.{ServiceControl, Log, DirectBufferPoolFactory, Logging}
-import java.util.zip.{ZipEntry, ZipOutputStream, ZipFile}
+import org.apache.activemq.apollo.broker.FileConfigStore
+import org.apache.activemq.apollo.dto.VirtualHostDTO
+import org.apache.activemq.apollo.util._
+import java.util.zip.{ZipEntry, ZipOutputStream}
 import org.apache.activemq.apollo.broker.store.{StreamManager, StoreFactory}
 import java.io.{OutputStream, FileOutputStream, File}
-
-object StoreExport extends Log
+import scala.util.continuations._
 
 /**
  * The apollo stop command
@@ -37,6 +33,8 @@ object StoreExport extends Log
 @command(scope="apollo", name = "store-export", description = "exports the contents of a
broker message store")
 class StoreExport extends Action {
 
+  object StoreExport extends Log
+
   @option(name = "--conf", description = "The Apollo configuration file.")
   var conf: File = _
 
@@ -61,10 +59,7 @@ class StoreExport extends Action {
         error("Configuration file'%s' does not exist.\n\nTry creating a broker instance using
the 'apollo create' command.".format(conf));
       }
 
-      val config_store = new FileConfigStore
-      config_store.file = conf
-      config_store.start
-      val config = config_store.load(true)
+      val config = new FileConfigStore(conf).load(true)
 
       val hosts = collection.JavaConversions.asScalaIterable(config.virtual_hosts).toArray
       val vho:Option[VirtualHostDTO] = if( host==null ) {
@@ -83,14 +78,14 @@ class StoreExport extends Action {
         error("Could not create the store.")
       }
 
-      store.configure(config.store, LoggingReporter(StoreExport))
+      store.configure(vh.store, LoggingReporter(StoreExport))
       ServiceControl.start(store, "store startup")
 
 
       using( new ZipOutputStream(new FileOutputStream(dest))) { out=>
         out.setMethod(ZipEntry.DEFLATED)
         out.setLevel(9)
-        store.export_pb(new StreamManager[OutputStream]() {
+        val manager = new StreamManager[OutputStream]() {
           def entry(name:String, func: (OutputStream) => Unit) = {
             out.putNextEntry(new ZipEntry(name));
             func(out)
@@ -99,7 +94,11 @@ class StoreExport extends Action {
           def using_queue_stream(func: (OutputStream) => Unit) = entry("queues.dat", func)
           def using_queue_entry_stream(func: (OutputStream) => Unit) = entry("queue_entries.dat",
func)
           def using_message_stream(func: (OutputStream) => Unit) = entry("messages.dat",
func)
-        })
+        }
+        reset {
+          val rc = store.export_pb(manager)
+          rc.failure_option.foreach(error _)
+        }
       }
 
       ServiceControl.stop(store, "store stop");

Copied: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
(from r1054037, activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala?p2=activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala&p1=activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala&r1=1054037&r2=1054038&rev=1054038&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
Fri Dec 31 01:21:29 2010
@@ -19,23 +19,22 @@ package org.apache.activemq.apollo.cli.c
 import org.apache.felix.gogo.commands.{Action, Option => option, Argument => argument,
Command => command}
 import org.osgi.service.command.CommandSession
 import org.apache.activemq.apollo.util.FileSupport._
-import org.apache.activemq.apollo.util.OptionSupport._
-import org.apache.commons.codec.binary.Base64
-import java.net.{HttpURLConnection, URL}
-import org.apache.activemq.apollo.broker.{VirtualHost, FileConfigStore}
-import org.apache.activemq.apollo.dto.{VirtualHostDTO, WebAdminDTO}
-import org.apache.activemq.apollo.util.{ServiceControl, Log, DirectBufferPoolFactory, Logging}
-import java.util.zip.{ZipEntry, ZipOutputStream, ZipFile}
+import org.apache.activemq.apollo.broker.FileConfigStore
+import org.apache.activemq.apollo.dto.VirtualHostDTO
+import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.broker.store.{StreamManager, StoreFactory}
-import java.io.{OutputStream, FileOutputStream, File}
+import scala.util.continuations._
+import java.util.zip.{ZipFile, ZipEntry, ZipOutputStream}
+import java.io.{InputStream, OutputStream, FileOutputStream, File}
 
-object StoreExport extends Log
 
 /**
  * The apollo stop command
  */
-@command(scope="apollo", name = "store-export", description = "exports the contents of a
broker message store")
-class StoreExport extends Action {
+@command(scope="apollo", name = "store-import", description = "imports a previously exported
message store")
+class StoreImport extends Action {
+
+  object StoreImport extends Log
 
   @option(name = "--conf", description = "The Apollo configuration file.")
   var conf: File = _
@@ -61,10 +60,7 @@ class StoreExport extends Action {
         error("Configuration file'%s' does not exist.\n\nTry creating a broker instance using
the 'apollo create' command.".format(conf));
       }
 
-      val config_store = new FileConfigStore
-      config_store.file = conf
-      config_store.start
-      val config = config_store.load(true)
+      val config = new FileConfigStore(conf).load(true)
 
       val hosts = collection.JavaConversions.asScalaIterable(config.virtual_hosts).toArray
       val vho:Option[VirtualHostDTO] = if( host==null ) {
@@ -83,23 +79,31 @@ class StoreExport extends Action {
         error("Could not create the store.")
       }
 
-      store.configure(config.store, LoggingReporter(StoreExport))
+      store.configure(vh.store, LoggingReporter(StoreImport))
       ServiceControl.start(store, "store startup")
 
-
-      using( new ZipOutputStream(new FileOutputStream(dest))) { out=>
-        out.setMethod(ZipEntry.DEFLATED)
-        out.setLevel(9)
-        store.export_pb(new StreamManager[OutputStream]() {
-          def entry(name:String, func: (OutputStream) => Unit) = {
-            out.putNextEntry(new ZipEntry(name));
-            func(out)
-            out.closeEntry();
+      val zip = new ZipFile(dest)
+      try {
+        val manager = new StreamManager[InputStream]() {
+          def entry(name:String, func: (InputStream) => Unit) = {
+            val entry = zip.getEntry(name)
+            if(entry == null) {
+              error("Invalid data file, zip entry not found: "+name);
+            }
+            using(zip.getInputStream(entry)) { is=>
+              func(is)
+            }
           }
-          def using_queue_stream(func: (OutputStream) => Unit) = entry("queues.dat", func)
-          def using_queue_entry_stream(func: (OutputStream) => Unit) = entry("queue_entries.dat",
func)
-          def using_message_stream(func: (OutputStream) => Unit) = entry("messages.dat",
func)
-        })
+          def using_queue_stream(func: (InputStream) => Unit) = entry("queues.dat", func)
+          def using_queue_entry_stream(func: (InputStream) => Unit) = entry("queue_entries.dat",
func)
+          def using_message_stream(func: (InputStream) => Unit) = entry("messages.dat",
func)
+        }
+        reset {
+          val rc = store.import_pb(manager)
+          rc.failure_option.foreach(error _)
+        }
+      } finally {
+        zip.close
       }
 
       ServiceControl.stop(store, "store stop");

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1054038&r1=1054037&r2=1054038&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
Fri Dec 31 01:21:29 2010
@@ -459,11 +459,11 @@ class JDBM2Client(store: JDBM2Store) {
 
       transaction {
 
-        def foreach[Buffer] (stream:InputStream, fact:PBMessageFactory[_,_])(func: (Buffer)=>Unit):Unit
= {
+        def foreach[B] (stream:InputStream, fact:PBMessageFactory[_,_])(func: (B)=>Unit):Unit
= {
           var done = false
           do {
             try {
-              func(fact.parseFramed(stream).asInstanceOf[Buffer])
+              func(fact.parseFramed(stream).asInstanceOf[B])
             } catch {
               case x:EOFException =>
                 done = true
@@ -475,7 +475,7 @@ class JDBM2Client(store: JDBM2Store) {
         import PBSupport._
 
         streams.using_queue_stream { queue_stream=>
-          foreach(queue_stream, QueuePB.FACTORY) { pb=>
+          foreach[QueuePB.Buffer](queue_stream, QueuePB.FACTORY) { pb =>
             val record:QueueRecord = pb
             queues_db.put(record.key, record)
             check_flush(1, 10000)
@@ -485,7 +485,7 @@ class JDBM2Client(store: JDBM2Store) {
         recman.commit
 
         streams.using_message_stream { message_stream=>
-          foreach(message_stream, MessagePB.FACTORY) { pb=>
+          foreach[MessagePB.Buffer](message_stream, MessagePB.FACTORY) { pb=>
             val record:MessageRecord = pb
             messages_db.put(record.key, record)
             check_flush(record.size, 1024*124*10)
@@ -495,7 +495,7 @@ class JDBM2Client(store: JDBM2Store) {
         recman.commit
 
         streams.using_queue_entry_stream { queue_entry_stream=>
-          foreach(queue_entry_stream, QueueEntryPB.FACTORY) { pb=>
+          foreach[QueueEntryPB.Buffer](queue_entry_stream, QueueEntryPB.FACTORY) { pb=>
             val record:QueueEntryRecord = pb
             entries_db.insert((record.queue_key, record.entry_seq), record, true)
             add_message_reference(record.message_key)

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1054038&r1=1054037&r2=1054038&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
Fri Dec 31 01:21:29 2010
@@ -71,7 +71,6 @@ class JDBM2Store extends DelayingStoreSu
   var next_msg_key = new AtomicLong(1)
 
   var executor:ExecutorService = _
-  var read_executor:ExecutorService = _
   var config:JDBM2StoreDTO = defaultConfig
   val client = new JDBM2Client(this)
 



Mime
View raw message