activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1234433 [1/2] - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-broker/src/test/scala/org/apache/active...
Date Sat, 21 Jan 2012 22:34:29 GMT
Author: chirino
Date: Sat Jan 21 22:34:28 2012
New Revision: 1234433

URL: http://svn.apache.org/viewvc?rev=1234433&view=rev
Log:
Switch from using zip to tar for the store export/imports files since zip files have a 4GB size limit.

Added:
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarBuffer.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarConstants.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarEntry.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarInputStream.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarOutputStream.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarUtils.java
Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
    activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1234433&r1=1234432&r2=1234433&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Sat Jan 21 22:34:28 2012
@@ -20,19 +20,12 @@ import dto.BDBStoreDTO
 import java.{lang=>jl}
 import java.{util=>ju}
 
-import java.util.concurrent.atomic.AtomicInteger
 import collection.mutable.ListBuffer
 import org.apache.activemq.apollo.broker.store._
 import org.apache.activemq.apollo.util._
-import java.io.{EOFException, InputStream, OutputStream}
-import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessageFactory}
-import org.apache.activemq.apollo.util.Log._
-import scala.Some
-import java.sql.ClientInfoStatus
+import java.io.{InputStream, OutputStream}
 import com.sleepycat.je._
-import javax.management.remote.rmi._RMIConnection_Stub
-import org.apache.activemq.apollo.util.FileSupport._
-import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
+import org.fusesource.hawtbuf.Buffer
 
 object BDBClient extends Log
 /**
@@ -549,56 +542,41 @@ class BDBClient(store: BDBStore) {
     }
   }
 
-  def export_pb(streams:StreamManager[OutputStream]):Option[String] = {
+  def export_data(os:OutputStream):Option[String] = {
     try {
-      streams.using_version_stream{ stream=>
-        new AsciiBuffer("1").writeTo(stream)
-      }
+      val manager = ExportStreamManager(os, 1)
 
       with_ctx() { ctx=>
         import ctx._
-        import PBSupport._
 
-        streams.using_map_stream { stream =>
-          map_db.cursor(tx) { (key,value) =>
-            val record = new MapEntryPB.Bean
-            record.setKey(key)
-            record.setValue(value)
-            record.freeze().writeFramed(stream)
-            true
-          }
+        messages_db.cursor(tx) { (_, value) =>
+          val record = MessagePB.FACTORY.parseUnframed(value.getData)
+          manager.store_message(record)
+          true
         }
 
-        streams.using_queue_stream { queue_stream =>
-          queues_db.cursor(tx) { (_, value) =>
-            val record:QueueRecord = value
-            record.freeze.writeFramed(queue_stream)
-            true
-          }
+        entries_db.cursor(tx) { (key, value) =>
+          val record = QueueEntryPB.FACTORY.parseUnframed(value.getData)
+          manager.store_queue_entry(record)
+          true
         }
 
-        streams.using_message_stream { message_stream=>
-          messages_db.cursor(tx) { (_, data) =>
-            import PBSupport._
-            val pb = MessagePB.FACTORY.parseUnframed(data.getData)
-            pb.writeFramed(message_stream)
-            if( pb.hasDirectSize ) {
-              val buffer_cp_buffer = direct_buffer_allocator.slice(pb.getDirectOffset, pb.getDirectSize)
-              buffer_cp_buffer.read(message_stream)
-            }
-            true
-          }
+        queues_db.cursor(tx) { (_, value) =>
+          val record = QueuePB.FACTORY.parseUnframed(value)
+          manager.store_queue(record)
+          true
         }
 
-        streams.using_queue_entry_stream { queue_entry_stream=>
-          entries_db.cursor(tx) { (key, value) =>
-            val record:QueueEntryRecord = value
-            record.freeze.writeFramed(queue_entry_stream)
-            true
-          }
+        map_db.cursor(tx) { (key,value) =>
+          val record = new MapEntryPB.Bean
+          record.setKey(key)
+          record.setValue(value)
+          manager.store_map_entry(record)
+          true
         }
-
       }
+
+      manager.finish
       None
     } catch {
       case x:Exception=>
@@ -606,75 +584,42 @@ class BDBClient(store: BDBStore) {
     }
   }
 
-  def import_pb(streams:StreamManager[InputStream]):Option[String] = {
+  def import_data(is:InputStream):Option[String] = {
     try {
-      streams.using_version_stream {
-        stream =>
-          var ver = read_text(stream).toInt
-          if (ver != 1) {
-            return Some("Cannot import from an export file at version: "+ver)
-          }
+      val manager = ImportStreamManager(is)
+      if(manager.version!=1) {
+        return Some("Cannot import from an export file of version: "+manager.version)
       }
-    } catch {
-      case e => return Some("Could not determine export format version: "+e)
-    }
-    try {
-      purge
 
-      def foreach[Buffer] (stream:InputStream, fact:PBMessageFactory[_,_])(func: (Buffer)=>Unit):Unit = {
-        var done = false
-        do {
-          try {
-            func(fact.parseFramed(stream).asInstanceOf[Buffer])
-          } catch {
-            case x:EOFException =>
-              done = true
-          }
-        } while( !done )
-      }
+      purge
 
       with_ctx() { ctx=>
         import ctx._
-        import PBSupport._
 
-        streams.using_queue_stream { queue_stream=>
-          foreach[QueuePB.Buffer](queue_stream, QueuePB.FACTORY) { pb=>
-            val record:QueueRecord = pb
-            queues_db.put(tx, record.key, record)
-          }
-        }
-
-        streams.using_map_stream { stream=>
-          foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb =>
-            map_db.put(tx, pb.getKey, pb.getValue)
-          }
-        }
+        while(manager.getNext match {
 
-        streams.using_message_stream { message_stream=>
-          foreach[MessagePB.Buffer](message_stream, MessagePB.FACTORY) { pb=>
+          case record:MessagePB.Buffer =>
+            messages_db.put(tx, record.getMessageKey, record.toUnframedBuffer)
+            true
 
-            var record:MessagePB.Buffer = pb
-            if( record.hasDirectSize ) {
-              val cp = record.copy
-              val buffer_cp_buffer = direct_buffer_allocator.alloc(cp.getDirectSize)
-              cp.setDirectOffset(buffer_cp_buffer.offset)
-              buffer_cp_buffer.write(message_stream)
-              lobs_db.put(tx, pb.getMessageKey, (buffer_cp_buffer.offset, buffer_cp_buffer.size))
-              record = cp.freeze
-            }
+          case record:QueueEntryPB.Buffer =>
+            entries_db.put(tx, (record.getQueueKey, record.getQueueSeq), record.toUnframedBuffer)
+            add_and_get(message_refs_db, record.getMessageKey, 1, tx)
+            true
 
-            messages_db.put(tx, record.getMessageKey, record)
-          }
-        }
+          case record:QueuePB.Buffer =>
+            queues_db.put(tx, record.getKey, record.toUnframedBuffer)
+            true
 
-        streams.using_queue_entry_stream { queue_entry_stream=>
-          foreach[QueueEntryPB.Buffer](queue_entry_stream, QueueEntryPB.FACTORY) { pb=>
-            val record:QueueEntryRecord = pb
+          case record:MapEntryPB.Buffer =>
+            map_db.put(tx, record.getKey, record.getValue)
+            true
 
-            entries_db.put(tx, (record.queue_key, record.entry_seq), record)
-            add_and_get(message_refs_db, record.message_key, 1, tx)
-          }
+          case null =>
+            false
+        }) { // keep looping
         }
+
       }
       None
 

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1234433&r1=1234432&r2=1234433&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala Sat Jan 21 22:34:28 2012
@@ -228,16 +228,15 @@ class BDBStore(var config:BDBStoreDTO) e
    * Exports the contents of the store to the provided streams.  Each stream should contain
    * a list of framed protobuf objects with the corresponding object types.
    */
-  def export_pb(streams:StreamManager[OutputStream], cb:(Option[String])=>Unit) = write_executor {
-    cb(client.export_pb(streams))
+  def export_data(os:OutputStream, cb:(Option[String])=>Unit) = write_executor {
+    cb(client.export_data(os))
   }
 
   /**
    * Imports a previously exported set of streams.  This deletes any previous data
    * in the store.
    */
-  def import_pb(streams:StreamManager[InputStream], cb:(Option[String])=>Unit) = write_executor {
-    cb(client.import_pb(streams))
+  def import_data(is:InputStream, cb:(Option[String])=>Unit) = write_executor {
+    cb(client.import_data(is))
   }
-
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1234433&r1=1234432&r2=1234433&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala Sat Jan 21 22:34:28 2012
@@ -48,14 +48,14 @@ object PBSupport {
     rc
   }
 
-  def encode_message_record(out: OutputStream, v: MessageRecord) = to_pb(v).freeze.writeUnframed(out)
-  def decode_message_record(in: InputStream):MessageRecord = MessagePB.FACTORY.parseUnframed(in)
+  def encode_message_record(out: OutputStream, v: MessageRecord) = to_pb(v).freeze.writeFramed(out)
+  def decode_message_record(in: InputStream):MessageRecord = MessagePB.FACTORY.parseFramed(in)
 
-  implicit def encode_message_record(v: MessageRecord):Array[Byte] = to_pb(v).freeze.toUnframedByteArray
-  implicit def decode_message_record(data: Array[Byte]):MessageRecord = MessagePB.FACTORY.parseUnframed(data)
+  implicit def encode_message_record(v: MessageRecord):Array[Byte] = to_pb(v).freeze.toFramedByteArray
+  implicit def decode_message_record(data: Array[Byte]):MessageRecord = MessagePB.FACTORY.parseFramed(data)
 
-  implicit def encode_message_record_buffer(v: MessageRecord) = to_pb(v).freeze.toUnframedBuffer
-  implicit def decode_message_record_buffer(data: Buffer):MessageRecord = MessagePB.FACTORY.parseUnframed(data)
+  implicit def encode_message_record_buffer(v: MessageRecord) = to_pb(v).freeze.toFramedBuffer
+  implicit def decode_message_record_buffer(data: Buffer):MessageRecord = MessagePB.FACTORY.parseFramed(data)
 
 
   implicit def to_pb(v: QueueRecord):QueuePB.Bean = {
@@ -70,14 +70,14 @@ object PBSupport {
     QueueRecord(pb.getKey, pb.getBindingKind, pb.getBindingData)
   }
 
-  def encode_queue_record(out: OutputStream, v: QueueRecord) = to_pb(v).freeze.writeUnframed(out)
-  def decode_queue_record(in: InputStream):QueueRecord = QueuePB.FACTORY.parseUnframed(in)
+  def encode_queue_record(out: OutputStream, v: QueueRecord) = to_pb(v).freeze.writeFramed(out)
+  def decode_queue_record(in: InputStream):QueueRecord = QueuePB.FACTORY.parseFramed(in)
 
-  implicit def encode_queue_record(v: QueueRecord) = to_pb(v).freeze.toUnframedByteArray
-  implicit def decode_queue_record(data: Array[Byte]):QueueRecord = QueuePB.FACTORY.parseUnframed(data)
+  implicit def encode_queue_record(v: QueueRecord) = to_pb(v).freeze.toFramedByteArray
+  implicit def decode_queue_record(data: Array[Byte]):QueueRecord = QueuePB.FACTORY.parseFramed(data)
 
-  implicit def encode_queue_record_buffer(v: QueueRecord) = to_pb(v).freeze.toUnframedBuffer
-  implicit def decode_queue_record_buffer(data: Buffer):QueueRecord = QueuePB.FACTORY.parseUnframed(data)
+  implicit def encode_queue_record_buffer(v: QueueRecord) = to_pb(v).freeze.toFramedBuffer
+  implicit def decode_queue_record_buffer(data: Buffer):QueueRecord = QueuePB.FACTORY.parseFramed(data)
 
   implicit def to_pb(v: QueueEntryRecord):QueueEntryPB.Bean = {
     val pb = new QueueEntryPB.Bean
@@ -105,13 +105,13 @@ object PBSupport {
     rc
   }
 
-  def encode_queue_entry_record(out: OutputStream, v: QueueEntryRecord) = to_pb(v).freeze.writeUnframed(out)
-  def decode_queue_entry_record(in: InputStream):QueueEntryRecord = QueueEntryPB.FACTORY.parseUnframed(in)
+  def encode_queue_entry_record(out: OutputStream, v: QueueEntryRecord) = to_pb(v).freeze.writeFramed(out)
+  def decode_queue_entry_record(in: InputStream):QueueEntryRecord = QueueEntryPB.FACTORY.parseFramed(in)
 
-  implicit def encode_queue_entry_record(v: QueueEntryRecord) = to_pb(v).freeze.toUnframedByteArray
-  implicit def decode_queue_entry_record(data: Array[Byte]):QueueEntryRecord = QueueEntryPB.FACTORY.parseUnframed(data)
+  implicit def encode_queue_entry_record(v: QueueEntryRecord) = to_pb(v).freeze.toFramedByteArray
+  implicit def decode_queue_entry_record(data: Array[Byte]):QueueEntryRecord = QueueEntryPB.FACTORY.parseFramed(data)
 
-  implicit def encode_queue_entry_record_buffer(v: QueueEntryRecord) = to_pb(v).freeze.toUnframedBuffer
-  implicit def decode_queue_entry_record_buffer(data: Buffer):QueueEntryRecord = QueueEntryPB.FACTORY.parseUnframed(data)
+  implicit def encode_queue_entry_record_buffer(v: QueueEntryRecord) = to_pb(v).freeze.toFramedBuffer
+  implicit def decode_queue_entry_record_buffer(data: Buffer):QueueEntryRecord = QueueEntryPB.FACTORY.parseFramed(data)
 
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1234433&r1=1234432&r2=1234433&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala Sat Jan 21 22:34:28 2012
@@ -18,48 +18,88 @@ package org.apache.activemq.apollo.broke
  */
 import org.apache.activemq.apollo.dto.StoreStatusDTO
 import org.apache.activemq.apollo.util._
-import java.io.{InputStream, OutputStream}
 import java.util.concurrent.atomic.AtomicReference
-import org.fusesource.hawtbuf.Buffer
-import java.util.zip.{ZipFile, ZipEntry, ZipOutputStream}
-import FileSupport._
-
-trait StreamManager[A] {
-  def using_version_stream(func: (A)=>Unit)
-  def using_map_stream(func: (A)=>Unit)
-  def using_queue_stream(func: (A)=>Unit)
-  def using_message_stream(func: (A)=>Unit)
-  def using_queue_entry_stream(func: (A)=>Unit)
-}
+import org.apache.activemq.apollo.util.tar._
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+import org.fusesource.hawtbuf.{AsciiBuffer, Buffer}
+import java.io._
+import org.apache.activemq.apollo.util.FileSupport._
+import org.fusesource.hawtbuf.proto.MessageBuffer
+
+case class ExportStreamManager(target:OutputStream, version:Int) {
+  val stream = new TarOutputStream(new GZIPOutputStream(target))
+
+  var seq:Long = 0;
+  
+  def finish = stream.close()
+
+  private def store(ext:String, value:Buffer) = {
+    var entry = new TarEntry(seq.toString + "." + ext)
+    seq += 1
+    entry.setSize(value.length())
+    stream.putNextEntry(entry);
+    value.writeTo(stream)
+    stream.closeEntry();
+  }
+
+  private def store(ext:String, value:MessageBuffer[_,_]) = {
+    var entry = new TarEntry(seq.toString + "." + ext)
+    seq += 1
+    entry.setSize(value.serializedSizeFramed())
+    stream.putNextEntry(entry);
+    value.writeFramed(stream)
+    stream.closeEntry();
+  }
+
+  store("ver", new AsciiBuffer(version.toString))
+
+  def store_queue(value:QueuePB.Getter) = {
+    store("que", value.freeze())
+  }
+  def store_queue_entry(value:QueueEntryPB.Getter) = {
+    store("qen", value.freeze())
+  }
+  def store_message(value:MessagePB.Getter) = {
+    store("msg", value.freeze())
+  }
+  def store_map_entry(value:MapEntryPB.Getter) = {
+    store("map", value.freeze())
+  }
 
-case class ZipOputputStreamManager(out:ZipOutputStream) extends StreamManager[OutputStream]() {
-  def entry(name:String, func: (OutputStream) => Unit) = {
-    out.putNextEntry(new ZipEntry(name));
-    func(out)
-    out.closeEntry();
-  }
-  def using_version_stream(func: (OutputStream) => Unit) = entry("version.txt", func)
-  def using_queue_stream(func: (OutputStream) => Unit) = entry("queues.dat", func)
-  def using_queue_entry_stream(func: (OutputStream) => Unit) = entry("queue_entries.dat", func)
-  def using_message_stream(func: (OutputStream) => Unit) = entry("messages.dat", func)
-  def using_map_stream(func: (OutputStream) => Unit) = entry("map.dat", func)
 }
 
-case class ZipInputStreamManager(zip:ZipFile) extends StreamManager[InputStream]() {
-  def entry(name:String, func: (InputStream) => Unit) = {
-    val entry = zip.getEntry(name)
-    if(entry == null) {
-      sys.error("Invalid data file, zip entry not found: "+name);
+case class ImportStreamManager(source:InputStream) {
+  
+  val stream = new TarInputStream(new GZIPInputStream(source))
+
+  val version = try {
+    var entry = stream.getNextEntry
+    if( entry.getName != "0.ver" ) {
+      throw new Exception("0.ver entry missing")
     }
-    using(zip.getInputStream(entry)) { is=>
-      func(is)
+    read_text(stream).toInt
+  } catch {
+    case e => new IOException("Could not determine export format version: "+e)
+  }
+  
+  def getNext:AnyRef = {
+    var entry = stream.getNextEntry
+    if( entry==null ) {
+      return null;
+    }
+
+    if( entry.getName.endsWith(".qen") ) {
+      QueueEntryPB.FACTORY.parseFramed(stream)
+    } else if( entry.getName.endsWith(".msg") ) {
+      MessagePB.FACTORY.parseFramed(stream)
+    } else if( entry.getName.endsWith(".que") ) {
+      QueuePB.FACTORY.parseFramed(stream)
+    } else if( entry.getName.endsWith(".map") ) {
+      MapEntryPB.FACTORY.parseFramed(stream)
+    } else {
+      throw new Exception("Unknown entry: "+entry.getName)
     }
   }
-  def using_version_stream(func: (InputStream) => Unit) = entry("version.txt", func)
-  def using_queue_stream(func: (InputStream) => Unit) = entry("queues.dat", func)
-  def using_queue_entry_stream(func: (InputStream) => Unit) = entry("queue_entries.dat", func)
-  def using_message_stream(func: (InputStream) => Unit) = entry("messages.dat", func)
-  def using_map_stream(func: (InputStream) => Unit) = entry("map.dat", func)
 }
 
 
@@ -146,14 +186,12 @@ trait Store extends ServiceTrait {
   def load_message(messageKey:Long, locator:AtomicReference[Object])(callback:(Option[MessageRecord])=>Unit )
 
   /**
-   * Exports the contents of the store to the provided streams.  Each stream should contain
-   * a list of framed protobuf objects with the corresponding object types.
+   * Exports the contents of the store to the provided stream.
    */
-  def export_pb(streams:StreamManager[OutputStream], cb:(Option[String])=>Unit):Unit
+  def export_data(os:OutputStream, cb:(Option[String])=>Unit):Unit
 
   /**
-   * Imports a previously exported set of streams.  This deletes any previous data
-   * in the store.
+   * Imports a previous export from the input stream.
    */
-  def import_pb(streams:StreamManager[InputStream], cb:(Option[String])=>Unit):Unit
+  def import_data(is:InputStream, cb:(Option[String])=>Unit):Unit
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1234433&r1=1234432&r2=1234433&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Sat Jan 21 22:34:28 2012
@@ -18,13 +18,12 @@ package org.apache.activemq.apollo.broke
  */
 import org.fusesource.hawtbuf.AsciiBuffer._
 import org.fusesource.hawtdispatch.TaskTracker
-import java.util.concurrent.{TimeUnit, CountDownLatch}
+import java.util.concurrent.TimeUnit
 import collection.mutable.ListBuffer
 import org.apache.activemq.apollo.util.{LoggingTracker, FunSuiteSupport, LongCounter}
 import org.scalatest.BeforeAndAfterEach
 import org.apache.activemq.apollo.util.FileSupport._
 import java.util.concurrent.atomic.AtomicReference
-import java.util.zip.{ZipFile, ZipOutputStream, ZipEntry}
 import java.io._
 import org.apache.activemq.apollo.util.sync_cb
 
@@ -144,15 +143,13 @@ abstract class StoreFunSuiteSupport exte
       rc.get.buffer
     }
 
-    val file = test_data_dir / "export.zip"
+    val file = test_data_dir / "export.tgz"
     file.getParentFile.mkdirs()
-    using( new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(file)))) { out=>
-      val manager = ZipOputputStreamManager(out)
-
+    using( new BufferedOutputStream(new FileOutputStream(file))) { os =>
       // Export the data...
       expect(None) {
         sync_cb[Option[String]] { cb =>
-          store.export_pb(manager, cb)
+          store.export_data(os, cb)
         }
       }
     }
@@ -166,16 +163,12 @@ abstract class StoreFunSuiteSupport exte
     }
 
     // Import the data..
-    val zip = new ZipFile(file)
-    try {
-      val manager = ZipInputStreamManager(zip)
+    using(new BufferedInputStream(new FileInputStream(file))) { is =>
       expect(None) {
         sync_cb[Option[String]] { cb =>
-          store.import_pb(manager, cb)
+          store.import_data(is, cb)
         }
       }
-    } finally {
-      zip.close
     }
 
     // The data should be there now again..

Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala?rev=1234433&r1=1234432&r2=1234433&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala Sat Jan 21 22:34:28 2012
@@ -25,8 +25,8 @@ import org.apache.felix.service.command.
 import org.apache.activemq.apollo.broker.ConfigStore
 import java.io._
 import java.util.concurrent.CountDownLatch
-import org.apache.activemq.apollo.broker.store.ZipOputputStreamManager._
-import org.apache.activemq.apollo.broker.store.{ZipOputputStreamManager, StreamManager, StoreFactory}
+import org.apache.activemq.apollo.broker.store.ExportStreamManager._
+import org.apache.activemq.apollo.broker.store.{ExportStreamManager, StreamManager, StoreFactory}
 
 /**
  * The apollo stop command
@@ -42,7 +42,7 @@ class StoreExport extends Action {
   @option(name = "--virtual-host", description = "The id of the virtual host to export, if not specified, the default virtual host is selected.")
   var host: String = _
 
-  @argument(name = "file", description = "The zip file to hold the exported data", index=0, required=true)
+  @argument(name = "file", description = "The compressed tar file to hold the exported data", index=0, required=true)
   var file:File = _
 
   def execute(session: CommandSession):AnyRef = {
@@ -79,15 +79,13 @@ class StoreExport extends Action {
         error("Could not create the store.")
       }
 
+      session.getConsole.println("Starting store: "+store)
       ServiceControl.start(store, "store startup")
-      session.getConsole.println("Exporting... (this might take a while)")
-      using( new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(file)))) { out=>
-        out.setMethod(ZipEntry.DEFLATED)
-        out.setLevel(9)
-        val manager = ZipOputputStreamManager(out)
 
+      session.getConsole.println("Exporting... (this might take a while)")
+      using( ExportStreamManager(new BufferedOutputStream(new FileOutputStream(file)))) { manager=>
         sync_cb[Option[String]] { cb =>
-          store.export_pb(manager, cb)
+          store.export_data(manager, cb)
         }.foreach(error _)
       }
       ServiceControl.stop(store, "store stop");

Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala?rev=1234433&r1=1234432&r2=1234433&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala Sat Jan 21 22:34:28 2012
@@ -22,11 +22,11 @@ import org.apache.activemq.apollo.dto.Vi
 import org.apache.activemq.apollo.util._
 import scala.util.continuations._
 import java.util.zip.ZipFile
-import java.io.{InputStream, File}
 import org.apache.felix.service.command.CommandSession
 import org.apache.activemq.apollo.broker.ConfigStore
-import org.apache.activemq.apollo.broker.store.ZipInputStreamManager._
-import org.apache.activemq.apollo.broker.store.{ZipInputStreamManager, StreamManager, StoreFactory}
+import org.apache.activemq.apollo.broker.store.ImportStreamManager._
+import org.apache.activemq.apollo.broker.store.{ImportStreamManager, StreamManager, StoreFactory}
+import java.io.{FileInputStream, BufferedInputStream, InputStream, File}
 
 
 /**
@@ -43,7 +43,7 @@ class StoreImport extends Action {
   @option(name = "--virtual-host", description = "The id of the virtual host to import into, if not specified, the default virtual host is selected.")
   var host: String = _
 
-  @argument(name = "file", description = "The zip file the contains that data for the import", index=0, required=true)
+  @argument(name = "file", description = "The compressed tar file the contains that data for the import", index=0, required=true)
   var file:File = _
 
   def execute(session: CommandSession):AnyRef = {
@@ -80,17 +80,14 @@ class StoreImport extends Action {
         error("Could not create the store.")
       }
 
+      session.getConsole.println("Starting store: "+store)
       ServiceControl.start(store, "store startup")
 
       session.getConsole.println("Importing: "+file)
-      val zip = new ZipFile(file)
-      try {
-        val manager = ZipInputStreamManager(zip)
+      using( ImportStreamManager(new BufferedInputStream(new FileInputStream(file)))) { manager =>
         sync_cb[Option[String]] { cb =>
-          store.import_pb(manager, cb)
+          store.import_data(manager, cb)
         }.foreach(error _)
-      } finally {
-        zip.close
       }
 
       ServiceControl.stop(store, "store stop");

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1234433&r1=1234432&r2=1234433&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Sat Jan 21 22:34:28 2012
@@ -346,7 +346,7 @@ class LevelDBClient(store: LevelDBStore)
                   case LOG_ADD_MESSAGE =>
                   case LOG_ADD_QUEUE_ENTRY =>
                     replay_operations+=1
-                    val record = QueueEntryPB.FACTORY.parseUnframed(data)
+                    val record = QueueEntryPB.FACTORY.parseFramed(data)
                     val pos = decode_vlong(record.getMessageLocator)
                     index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), data)
                     pos.foreach(log_ref_increment(_))
@@ -354,7 +354,7 @@ class LevelDBClient(store: LevelDBStore)
                   case LOG_REMOVE_QUEUE_ENTRY =>
                     replay_operations+=1
                     index.get(data, new ReadOptions).foreach { value=>
-                      val record = QueueEntryPB.FACTORY.parseUnframed(value)
+                      val record = QueueEntryPB.FACTORY.parseFramed(value)
                       val pos = decode_vlong(record.getMessageLocator)
                       pos.foreach(log_ref_decrement(_))
                       index.delete(data)
@@ -362,7 +362,7 @@ class LevelDBClient(store: LevelDBStore)
                     
                   case LOG_ADD_QUEUE =>
                     replay_operations+=1
-                    val record = QueuePB.FACTORY.parseUnframed(data)
+                    val record = QueuePB.FACTORY.parseFramed(data)
                     index.put(encode_key(queue_prefix, record.getKey), data)
 
                   case LOG_REMOVE_QUEUE =>
@@ -377,7 +377,7 @@ class LevelDBClient(store: LevelDBStore)
 
                       // Figure out what log file that message entry was in so we can,
                       // decrement the log file reference.
-                      val record = QueueEntryPB.FACTORY.parseUnframed(value)
+                      val record = QueueEntryPB.FACTORY.parseFramed(value)
                       val pos = decode_vlong(record.getMessageLocator)
                       log_ref_decrement(pos)
                       true
@@ -385,7 +385,7 @@ class LevelDBClient(store: LevelDBStore)
 
                   case LOG_MAP_ENTRY =>
                     replay_operations+=1
-                    val entry = MapEntryPB.FACTORY.parseUnframed(data)
+                    val entry = MapEntryPB.FACTORY.parseFramed(data)
                     if (entry.getValue == null) {
                       index.delete(encode_key(map_prefix, entry.getKey))
                     } else {
@@ -421,7 +421,7 @@ class LevelDBClient(store: LevelDBStore)
     index.cursor_prefixed(queue_entry_prefix_array) { (key, value)=>
       try {
         val (_, queue_key, seq_key) = decode_long_long_key(key)
-        val record = QueueEntryPB.FACTORY.parseUnframed(value)
+        val record = QueueEntryPB.FACTORY.parseFramed(value)
         val (pos, len) = decode_locator(record.getMessageLocator)
         if (record.getQueueKey != queue_key) {
           throw new IOException("key missmatch")
@@ -448,7 +448,7 @@ class LevelDBClient(store: LevelDBStore)
     index.cursor_prefixed(queue_prefix_array) { (key, value)=>
       try {
         val (_, queue_key) = decode_long_key(key)
-        val record = QueuePB.FACTORY.parseUnframed(value)
+        val record = QueuePB.FACTORY.parseFramed(value)
         if (record.getKey != queue_key) {
           throw new IOException("key missmatch")
         }
@@ -469,7 +469,7 @@ class LevelDBClient(store: LevelDBStore)
         trace("invalid queue entry record: %s, error: queue key does not exits %s", new Buffer(key), queue_key)
         fixed_records += 1
         index.delete(key)
-        val record = QueueEntryPB.FACTORY.parseUnframed(value)
+        val record = QueueEntryPB.FACTORY.parseFramed(value)
         val pos = decode_vlong(record.getMessageLocator)
         log.log_info(pos).foreach { log_info =>
           actual_log_refs.get(log_info.position).foreach { counter =>
@@ -718,7 +718,7 @@ class LevelDBClient(store: LevelDBStore)
 
           // Figure out what log file that message entry was in so we can,
           // decrement the log file reference.
-          val record = QueueEntryPB.FACTORY.parseUnframed(value)
+          val record = QueueEntryPB.FACTORY.parseFramed(value)
           val pos = decode_vlong(record.getMessageLocator)
           log_ref_decrement(pos)
           true
@@ -745,7 +745,7 @@ class LevelDBClient(store: LevelDBStore)
                 entry.setValue(value)
                 batch.put(encode_key(map_prefix, key), value.toByteArray)
               }
-              appender.append(LOG_MAP_ENTRY, entry.freeze().toUnframedByteArray)
+              appender.append(LOG_MAP_ENTRY, entry.freeze().toFramedByteArray)
             }
 
             uow.actions.foreach { case (msg, action) =>
@@ -783,7 +783,7 @@ class LevelDBClient(store: LevelDBStore)
                 val record = PBSupport.to_pb(entry)
                 record.setMessageLocator(locator_buffer)
 
-                val encoded = record.freeze().toUnframedBuffer
+                val encoded = record.freeze().toFramedBuffer
                 appender.append(LOG_ADD_QUEUE_ENTRY, encoded)
                 batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded)
                 
@@ -906,7 +906,7 @@ class LevelDBClient(store: LevelDBStore)
             group.first_entry_seq = current_key
           }
 
-          val entry = QueueEntryPB.FACTORY.parseUnframed(value)
+          val entry = QueueEntryPB.FACTORY.parseFramed(value)
           val pos = decode_vlong(entry.getMessageLocator)
 
           group.last_entry_seq = current_key
@@ -947,7 +947,7 @@ class LevelDBClient(store: LevelDBStore)
         val start = encode_key(queue_entry_prefix, queue_key, firstSeq)
         val end = encode_key(queue_entry_prefix, queue_key, lastSeq+1)
         index.cursor_range( start, end, ro ) { (key, value) =>
-          val record = QueueEntryPB.FACTORY.parseUnframed(value)
+          val record = QueueEntryPB.FACTORY.parseFramed(value)
           val entry = PBSupport.from_pb(record)
           entry.message_locator = new AtomicReference[Object](decode_locator(record.getMessageLocator))
           rc += entry
@@ -1063,11 +1063,9 @@ class LevelDBClient(store: LevelDBStore)
 //  }
 
 
-  def export_pb(streams:StreamManager[OutputStream]):Option[String] = {
+  def export_data(os:OutputStream):Option[String] = {
     try {
-      streams.using_version_stream{ stream=>
-        new AsciiBuffer("1").writeTo(stream)
-      }
+      val manager = ExportStreamManager(os, 1)
 
       retry_using_index {
         
@@ -1078,68 +1076,62 @@ class LevelDBClient(store: LevelDBStore)
         }
         
         index.snapshot { snapshot=>
-          val ro = new ReadOptions
-          ro.snapshot(snapshot)
-          ro.verifyChecksums(verify_checksums)
-          ro.fillCache(false)
+          val nocache = new ReadOptions
+          nocache.snapshot(snapshot)
+          nocache.verifyChecksums(verify_checksums)
+          nocache.fillCache(false)
           
-          def write_framed(stream:OutputStream, value:Array[Byte]) = {
-            val helper = new AbstractVarIntSupport {
-              def readByte: Byte = throw new UnsupportedOperationException
-              def writeByte(value: Int) = stream.write(value)
-            }
-            helper.writeVarInt(value.length)
-            stream.write(value);
+          val cache = new ReadOptions
+          nocache.snapshot(snapshot)
+          nocache.verifyChecksums(false)
+          nocache.fillCache(false)
+
+          // Build a temp table of all references messages by the queues
+          // Remember 2 queues could reference the same message.
+          index.cursor_prefixed(queue_entry_prefix_array, cache) { (_, value) =>
+            val record = QueueEntryPB.FACTORY.parseFramed(value)
+            val (pos, len) = decode_locator(record.getMessageLocator)
+            index.put(encode_key(tmp_prefix, pos), encode_vlong(len))
             true
           }
 
-          streams.using_map_stream { stream=>
-            index.cursor_prefixed(map_prefix_array, ro) { (key, value) =>
-              val key_buffer = new Buffer(key)
-              key_buffer.moveHead(1)
-              val record = new MapEntryPB.Bean
-              record.setKey(key_buffer)
-              record.setValue(new Buffer(value))
-              record.freeze().writeFramed(stream)
-              true
+          // Use the temp table to export all the referenced messages. Use
+          // the log position as the message key.
+          index.cursor_prefixed(Array(tmp_prefix)) { (key, value) =>
+            val (_, pos) = decode_long_key(key)
+            val len = decode_vlong(value).toInt
+            log.read(pos, len).foreach { value =>
+              // Set the message key to be the position in the log.
+              val record = MessagePB.FACTORY.parseFramed(value).copy
+              record.setMessageKey(pos)
+              manager.store_message(record)
             }
+            true
           }
 
-          streams.using_queue_stream { stream =>
-            index.cursor_prefixed(queue_prefix_array) { (_, value) =>
-              write_framed(stream, value)
-            }
+          // Now export the queue entries
+          index.cursor_prefixed(queue_entry_prefix_array, nocache) { (_, value) =>
+            val record = QueueEntryPB.FACTORY.parseFramed(value).copy()
+            val (pos, len) = decode_locator(record.getMessageLocator)
+            record.setMessageKey(pos)
+            manager.store_queue_entry(record)
+            true
           }
 
-          // Figure out the active log locations..
-          streams.using_queue_entry_stream { stream=>
-            index.cursor_prefixed(queue_entry_prefix_array, ro) { (_, value) =>
-              val record = QueueEntryPB.FACTORY.parseUnframed(value).copy()
-              val (pos, len) = decode_locator(record.getMessageLocator)
-              record.setMessageKey(pos)
-              write_framed(stream, record.freeze().toUnframedByteArray)
-              index.put(encode_key(tmp_prefix, pos), encode_vlong(len))
-              true
-            }
+          index.cursor_prefixed(queue_prefix_array) { (_, value) =>
+            val record = QueuePB.FACTORY.parseFramed(value)
+            manager.store_queue(record)
+            true
           }
 
-          streams.using_message_stream { stream=>
-            index.cursor_prefixed(Array(tmp_prefix)) { (key, value) =>
-              val (_, pos) = decode_long_key(key)
-              val len = decode_vlong(value).toInt
-              log.read(pos, len).foreach { value =>
-                // Set the message key to be the position in the log.
-                val copy = MessagePB.FACTORY.parseUnframed(value).copy
-                copy.setMessageKey(pos)
-                val data = copy.freeze.toUnframedBuffer
-                
-                val check = MessagePB.FACTORY.parseUnframed(data).copy
-                assert(check.getMessageKey == pos)
-                
-                write_framed(stream, data)
-              }
-              true
-            }
+          index.cursor_prefixed(map_prefix_array, nocache) { (key, value) =>
+            val key_buffer = new Buffer(key)
+            key_buffer.moveHead(1)
+            val record = new MapEntryPB.Bean
+            record.setKey(key_buffer)
+            record.setValue(new Buffer(value))
+            manager.store_map_entry(record)
+            true
           }
 
         }
@@ -1151,84 +1143,65 @@ class LevelDBClient(store: LevelDBStore)
         }
 
       }
+      manager.finish
+
       None
     } catch {
       case x:Exception=>
+        debug(x, "Export failed")
+        x.printStackTrace()
         Some(x.getMessage)
     }
   }
 
-  def import_pb(streams:StreamManager[InputStream]):Option[String] = {
+  def import_data(is:InputStream):Option[String] = {
     try {
-      streams.using_version_stream {
-        stream =>
-          var ver = read_text(stream).toInt
-          if (ver != 1) {
-            return Some("Cannot import from an export file at version: "+ver)
-          }
+      val manager = ImportStreamManager(is)
+      if(manager.version!=1) {
+        return Some("Cannot import from an export file of version: "+manager.version)
       }
-    } catch {
-      case e => return Some("Could not determine export format version: "+e)
-    }
 
-    try {
       purge
-      log_refs.clear()
-          
-      val actual_log_refs = HashMap[Long, LongCounter]()
-      
-      retry_using_index {
-        def foreach[Buffer] (stream:InputStream, fact:PBMessageFactory[_,_])(func: (Buffer)=>Unit):Unit = {
-          var done = false
-          do {
-            try {
-              func(fact.parseFramed(stream).asInstanceOf[Buffer])
-            } catch {
-              case x:EOFException =>
-                done = true
-            }
-          } while( !done )
-        }
 
+      retry_using_index {
         log.appender { appender =>
-                
-          streams.using_map_stream { stream=>
-            foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb =>
-              index.put(encode_key(map_prefix, pb.getKey), pb.getValue.toByteArray)
-            }
-          }
-
-          streams.using_queue_stream { stream=>
-            foreach[QueuePB.Buffer](stream, QueuePB.FACTORY) { record=>
-              index.put(encode_key(queue_prefix, record.getKey), record.toUnframedByteArray)
-            }
-          }
+          while(manager.getNext match {
 
-          streams.using_message_stream { stream=>
-            foreach[MessagePB.Buffer](stream, MessagePB.FACTORY) { record=>
-              val message_data = record.toUnframedBuffer
+            case record:MessagePB.Buffer =>
+              val message_data = record.toFramedBuffer
               val pos = appender.append(LOG_ADD_MESSAGE, message_data)
               index.put(encode_key(tmp_prefix, record.getMessageKey), encode_locator(pos, message_data.length))
-            }
-          }
+              true
 
-          streams.using_queue_entry_stream { stream=>
-            foreach[QueueEntryPB.Buffer](stream, QueueEntryPB.FACTORY) { record=>
+            case record:QueueEntryPB.Buffer =>
               val copy = record.copy();
               var original_msg_key: Long = record.getMessageKey
               index.get(encode_key(tmp_prefix, original_msg_key)) match {
                 case Some(locator)=>
                 val (pos, len) = decode_locator(locator)
                 copy.setMessageLocator(locator)
-                index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), copy.freeze().toUnframedBuffer)
+                index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), copy.freeze().toFramedBuffer)
                 log.log_info(pos).foreach { log_info =>
                   log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
                 }
                 case None =>
                   println("Invalid queue entry, references message that was not in the export: "+original_msg_key)
               }
-            }
+              true
+
+            case record:QueuePB.Buffer =>
+              index.put(encode_key(queue_prefix, record.getKey), record.toFramedBuffer)
+              true
+
+            case record:MapEntryPB.Buffer =>
+              index.put(encode_key(map_prefix, record.getKey), record.getValue)
+              true
+
+            case null =>
+              false
+          }) { // keep looping
           }
+
         }
       }
 
@@ -1244,6 +1217,7 @@ class LevelDBClient(store: LevelDBStore)
 
     } catch {
       case x:Exception=>
+        debug(x, "Import failed")
         Some(x.getMessage)
     }
   }

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala?rev=1234433&r1=1234432&r2=1234433&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala Sat Jan 21 22:34:28 2012
@@ -290,16 +290,16 @@ class LevelDBStore(val config:LevelDBSto
    * Exports the contents of the store to the provided streams.  Each stream should contain
    * a list of framed protobuf objects with the corresponding object types.
    */
-  def export_pb(streams:StreamManager[OutputStream], cb:(Option[String])=>Unit) = write_executor {
-    cb(client.export_pb(streams))
+  def export_data(os:OutputStream, cb:(Option[String])=>Unit) = write_executor {
+    cb(client.export_data(os))
   }
 
   /**
    * Imports a previously exported set of streams.  This deletes any previous data
    * in the store.
    */
-  def import_pb(streams:StreamManager[InputStream], cb:(Option[String])=>Unit) = write_executor {
-    cb(client.import_pb(streams))
+  def import_data(is:InputStream, cb:(Option[String])=>Unit) = write_executor {
+    cb(client.import_data(is))
   }
 
 }

Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarBuffer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarBuffer.java?rev=1234433&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarBuffer.java (added)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarBuffer.java Sat Jan 21 22:34:28 2012
@@ -0,0 +1,462 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.apollo.util.tar;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * The TarBuffer class implements the tar archive concept
+ * of a buffered input stream. This concept goes back to the
+ * days of blocked tape drives and special io devices. In the
+ * Java universe, the only real function that this class
+ * performs is to ensure that files have the correct "block"
+ * size, or other tars will complain.
+ * <p>
+ * You should never have a need to access this class directly.
+ * TarBuffers are created by Tar IO Streams.
+ *
+ */
+
+public class TarBuffer {
+
+    /** Default record size */
+    public static final int DEFAULT_RCDSIZE = (512);
+
+    /** Default block size */
+    public static final int DEFAULT_BLKSIZE = (DEFAULT_RCDSIZE * 20);
+
+    private InputStream     inStream;
+    private OutputStream    outStream;
+    private byte[]          blockBuffer;
+    private int             currBlkIdx;
+    private int             currRecIdx;
+    private int             blockSize;
+    private int             recordSize;
+    private int             recsPerBlock;
+    private boolean         debug;
+
+    /**
+     * Constructor for a TarBuffer on an input stream.
+     * @param inStream the input stream to use
+     */
+    public TarBuffer(InputStream inStream) {
+        this(inStream, TarBuffer.DEFAULT_BLKSIZE);
+    }
+
+    /**
+     * Constructor for a TarBuffer on an input stream.
+     * @param inStream the input stream to use
+     * @param blockSize the block size to use
+     */
+    public TarBuffer(InputStream inStream, int blockSize) {
+        this(inStream, blockSize, TarBuffer.DEFAULT_RCDSIZE);
+    }
+
+    /**
+     * Constructor for a TarBuffer on an input stream.
+     * @param inStream the input stream to use
+     * @param blockSize the block size to use
+     * @param recordSize the record size to use
+     */
+    public TarBuffer(InputStream inStream, int blockSize, int recordSize) {
+        this.inStream = inStream;
+        this.outStream = null;
+
+        this.initialize(blockSize, recordSize);
+    }
+
+    /**
+     * Constructor for a TarBuffer on an output stream.
+     * @param outStream the output stream to use
+     */
+    public TarBuffer(OutputStream outStream) {
+        this(outStream, TarBuffer.DEFAULT_BLKSIZE);
+    }
+
+    /**
+     * Constructor for a TarBuffer on an output stream.
+     * @param outStream the output stream to use
+     * @param blockSize the block size to use
+     */
+    public TarBuffer(OutputStream outStream, int blockSize) {
+        this(outStream, blockSize, TarBuffer.DEFAULT_RCDSIZE);
+    }
+
+    /**
+     * Constructor for a TarBuffer on an output stream.
+     * @param outStream the output stream to use
+     * @param blockSize the block size to use
+     * @param recordSize the record size to use
+     */
+    public TarBuffer(OutputStream outStream, int blockSize, int recordSize) {
+        this.inStream = null;
+        this.outStream = outStream;
+
+        this.initialize(blockSize, recordSize);
+    }
+
+    /**
+     * Initialization common to all constructors.
+     */
+    private void initialize(int blockSize, int recordSize) {
+        this.debug = false;
+        this.blockSize = blockSize;
+        this.recordSize = recordSize;
+        this.recsPerBlock = (this.blockSize / this.recordSize);
+        this.blockBuffer = new byte[this.blockSize];
+
+        if (this.inStream != null) {
+            this.currBlkIdx = -1;
+            this.currRecIdx = this.recsPerBlock;
+        } else {
+            this.currBlkIdx = 0;
+            this.currRecIdx = 0;
+        }
+    }
+
+    /**
+     * Get the TAR Buffer's block size. Blocks consist of multiple records.
+     * @return the block size
+     */
+    public int getBlockSize() {
+        return this.blockSize;
+    }
+
+    /**
+     * Get the TAR Buffer's record size.
+     * @return the record size
+     */
+    public int getRecordSize() {
+        return this.recordSize;
+    }
+
+    /**
+     * Set the debugging flag for the buffer.
+     *
+     * @param debug If true, print debugging output.
+     */
+    public void setDebug(boolean debug) {
+        this.debug = debug;
+    }
+
+    /**
+     * Determine if an archive record indicate End of Archive. End of
+     * archive is indicated by a record that consists entirely of null bytes.
+     *
+     * @param record The record data to check.
+     * @return true if the record data is an End of Archive
+     */
+    public boolean isEOFRecord(byte[] record) {
+        for (int i = 0, sz = getRecordSize(); i < sz; ++i) {
+            if (record[i] != 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Skip over a record on the input stream.
+     * @throws IOException on error
+     */
+    public void skipRecord() throws IOException {
+        if (debug) {
+            System.err.println("SkipRecord: recIdx = " + currRecIdx
+                               + " blkIdx = " + currBlkIdx);
+        }
+
+        if (inStream == null) {
+            throw new IOException("reading (via skip) from an output buffer");
+        }
+
+        if (currRecIdx >= recsPerBlock) {
+            if (!readBlock()) {
+                return;    // UNDONE
+            }
+        }
+
+        currRecIdx++;
+    }
+
+    /**
+     * Read a record from the input stream and return the data.
+     *
+     * @return The record data.
+     * @throws IOException on error
+     */
+    public byte[] readRecord() throws IOException {
+        if (debug) {
+            System.err.println("ReadRecord: recIdx = " + currRecIdx
+                               + " blkIdx = " + currBlkIdx);
+        }
+
+        if (inStream == null) {
+            throw new IOException("reading from an output buffer");
+        }
+
+        if (currRecIdx >= recsPerBlock) {
+            if (!readBlock()) {
+                return null;
+            }
+        }
+
+        byte[] result = new byte[recordSize];
+
+        System.arraycopy(blockBuffer,
+                         (currRecIdx * recordSize), result, 0,
+                         recordSize);
+
+        currRecIdx++;
+
+        return result;
+    }
+
+    /**
+     * @return false if End-Of-File, else true
+     */
+    private boolean readBlock() throws IOException {
+        if (debug) {
+            System.err.println("ReadBlock: blkIdx = " + currBlkIdx);
+        }
+
+        if (inStream == null) {
+            throw new IOException("reading from an output buffer");
+        }
+
+        currRecIdx = 0;
+
+        int offset = 0;
+        int bytesNeeded = blockSize;
+
+        while (bytesNeeded > 0) {
+            long numBytes = inStream.read(blockBuffer, offset,
+                                               bytesNeeded);
+
+            //
+            // NOTE
+            // We have fit EOF, and the block is not full!
+            //
+            // This is a broken archive. It does not follow the standard
+            // blocking algorithm. However, because we are generous, and
+            // it requires little effort, we will simply ignore the error
+            // and continue as if the entire block were read. This does
+            // not appear to break anything upstream. We used to return
+            // false in this case.
+            //
+            // Thanks to 'Yohann.Roussel@alcatel.fr' for this fix.
+            //
+            if (numBytes == -1) {
+                if (offset == 0) {
+                    // Ensure that we do not read gigabytes of zeros
+                    // for a corrupt tar file.
+                    // See http://issues.apache.org/bugzilla/show_bug.cgi?id=39924
+                    return false;
+                }
+                // However, just leaving the unread portion of the buffer dirty does
+                // cause problems in some cases.  This problem is described in
+                // http://issues.apache.org/bugzilla/show_bug.cgi?id=29877
+                //
+                // The solution is to fill the unused portion of the buffer with zeros.
+
+                Arrays.fill(blockBuffer, offset, offset + bytesNeeded, (byte) 0);
+
+                break;
+            }
+
+            offset += numBytes;
+            bytesNeeded -= numBytes;
+
+            if (numBytes != blockSize) {
+                if (debug) {
+                    System.err.println("ReadBlock: INCOMPLETE READ "
+                                       + numBytes + " of " + blockSize
+                                       + " bytes read.");
+                }
+            }
+        }
+
+        currBlkIdx++;
+
+        return true;
+    }
+
+    /**
+     * Get the current block number, zero based.
+     *
+     * @return The current zero based block number.
+     */
+    public int getCurrentBlockNum() {
+        return currBlkIdx;
+    }
+
+    /**
+     * Get the current record number, within the current block, zero based.
+     * Thus, current offset = (currentBlockNum * recsPerBlk) + currentRecNum.
+     *
+     * @return The current zero based record number.
+     */
+    public int getCurrentRecordNum() {
+        return currRecIdx - 1;
+    }
+
+    /**
+     * Write an archive record to the archive.
+     *
+     * @param record The record data to write to the archive.
+     * @throws IOException on error
+     */
+    public void writeRecord(byte[] record) throws IOException {
+        if (debug) {
+            System.err.println("WriteRecord: recIdx = " + currRecIdx
+                               + " blkIdx = " + currBlkIdx);
+        }
+
+        if (outStream == null) {
+            throw new IOException("writing to an input buffer");
+        }
+
+        if (record.length != recordSize) {
+            throw new IOException("record to write has length '"
+                                  + record.length
+                                  + "' which is not the record size of '"
+                                  + recordSize + "'");
+        }
+
+        if (currRecIdx >= recsPerBlock) {
+            writeBlock();
+        }
+
+        System.arraycopy(record, 0, blockBuffer,
+                         (currRecIdx * recordSize),
+                         recordSize);
+
+        currRecIdx++;
+    }
+
+    /**
+     * Write an archive record to the archive, where the record may be
+     * inside of a larger array buffer. The buffer must be "offset plus
+     * record size" long.
+     *
+     * @param buf The buffer containing the record data to write.
+     * @param offset The offset of the record data within buf.
+     * @throws IOException on error
+     */
+    public void writeRecord(byte[] buf, int offset) throws IOException {
+        if (debug) {
+            System.err.println("WriteRecord: recIdx = " + currRecIdx
+                               + " blkIdx = " + currBlkIdx);
+        }
+
+        if (outStream == null) {
+            throw new IOException("writing to an input buffer");
+        }
+
+        if ((offset + recordSize) > buf.length) {
+            throw new IOException("record has length '" + buf.length
+                                  + "' with offset '" + offset
+                                  + "' which is less than the record size of '"
+                                  + recordSize + "'");
+        }
+
+        if (currRecIdx >= recsPerBlock) {
+            writeBlock();
+        }
+
+        System.arraycopy(buf, offset, blockBuffer,
+                         (currRecIdx * recordSize),
+                         recordSize);
+
+        currRecIdx++;
+    }
+
+    /**
+     * Write a TarBuffer block to the archive.
+     */
+    private void writeBlock() throws IOException {
+        if (debug) {
+            System.err.println("WriteBlock: blkIdx = " + currBlkIdx);
+        }
+
+        if (outStream == null) {
+            throw new IOException("writing to an input buffer");
+        }
+
+        outStream.write(blockBuffer, 0, blockSize);
+        outStream.flush();
+
+        currRecIdx = 0;
+        currBlkIdx++;
+        Arrays.fill(blockBuffer, (byte) 0);
+    }
+
+    /**
+     * Flush the current data block if it has any data in it.
+     */
+    void flushBlock() throws IOException {
+        if (debug) {
+            System.err.println("TarBuffer.flushBlock() called.");
+        }
+
+        if (outStream == null) {
+            throw new IOException("writing to an input buffer");
+        }
+
+        if (currRecIdx > 0) {
+            writeBlock();
+        }
+    }
+
+    /**
+     * Close the TarBuffer. If this is an output buffer, also flush the
+     * current block before closing.
+     * @throws IOException on error
+     */
+    public void close() throws IOException {
+        if (debug) {
+            System.err.println("TarBuffer.closeBuffer().");
+        }
+
+        if (outStream != null) {
+            flushBlock();
+
+            if (outStream != System.out
+                    && outStream != System.err) {
+                outStream.close();
+
+                outStream = null;
+            }
+        } else if (inStream != null) {
+            if (inStream != System.in) {
+                inStream.close();
+
+                inStream = null;
+            }
+        }
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarConstants.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarConstants.java?rev=1234433&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarConstants.java (added)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarConstants.java Sat Jan 21 22:34:28 2012
@@ -0,0 +1,158 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.apollo.util.tar;
+
+/**
+ * This interface contains all the definitions used in the package.
+ *
+ */
+// CheckStyle:InterfaceIsTypeCheck OFF (bc)
+public interface TarConstants {
+
+    /**
+     * The length of the name field in a header buffer.
+     */
+    int    NAMELEN = 100;
+
+    /**
+     * The length of the mode field in a header buffer.
+     */
+    int    MODELEN = 8;
+
+    /**
+     * The length of the user id field in a header buffer.
+     */
+    int    UIDLEN = 8;
+
+    /**
+     * The length of the group id field in a header buffer.
+     */
+    int    GIDLEN = 8;
+
+    /**
+     * The length of the checksum field in a header buffer.
+     */
+    int    CHKSUMLEN = 8;
+
+    /**
+     * The length of the size field in a header buffer.
+     */
+    int    SIZELEN = 12;
+
+    /**
+     * The maximum size of a file in a tar archive (That's 11 sevens, octal).
+     */
+    long   MAXSIZE = 077777777777L;
+
+    /**
+     * The length of the magic field in a header buffer.
+     */
+    int    MAGICLEN = 8;
+
+    /**
+     * The length of the modification time field in a header buffer.
+     */
+    int    MODTIMELEN = 12;
+
+    /**
+     * The length of the user name field in a header buffer.
+     */
+    int    UNAMELEN = 32;
+
+    /**
+     * The length of the group name field in a header buffer.
+     */
+    int    GNAMELEN = 32;
+
+    /**
+     * The length of the devices field in a header buffer.
+     */
+    int    DEVLEN = 8;
+
+    /**
+     * LF_ constants represent the "link flag" of an entry, or more commonly,
+     * the "entry type". This is the "old way" of indicating a normal file.
+     */
+    byte   LF_OLDNORM = 0;
+
+    /**
+     * Normal file type.
+     */
+    byte   LF_NORMAL = (byte) '0';
+
+    /**
+     * Link file type.
+     */
+    byte   LF_LINK = (byte) '1';
+
+    /**
+     * Symbolic link file type.
+     */
+    byte   LF_SYMLINK = (byte) '2';
+
+    /**
+     * Character device file type.
+     */
+    byte   LF_CHR = (byte) '3';
+
+    /**
+     * Block device file type.
+     */
+    byte   LF_BLK = (byte) '4';
+
+    /**
+     * Directory file type.
+     */
+    byte   LF_DIR = (byte) '5';
+
+    /**
+     * FIFO (pipe) file type.
+     */
+    byte   LF_FIFO = (byte) '6';
+
+    /**
+     * Contiguous file type.
+     */
+    byte   LF_CONTIG = (byte) '7';
+
+    /**
+     * The magic tag representing a POSIX tar archive.
+     */
+    String TMAGIC = "ustar";
+
+    /**
+     * The magic tag representing a GNU tar archive.
+     */
+    String GNU_TMAGIC = "ustar  ";
+
+    /**
+     * The namr of the GNU tar entry which contains a long name.
+     */
+    String GNU_LONGLINK = "././@LongLink";
+
+    /**
+     * Identifies the *next* file on the tape as having a long name.
+     */
+    byte LF_GNUTYPE_LONGNAME = (byte) 'L';
+}

Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarEntry.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarEntry.java?rev=1234433&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarEntry.java (added)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarEntry.java Sat Jan 21 22:34:28 2012
@@ -0,0 +1,664 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/*
+ * This package is based on the work done by Timothy Gerard Endres
+ * (time@ice.com) to whom the Ant project is very grateful for his great code.
+ */
+
+package org.apache.activemq.apollo.util.tar;
+
+import java.io.File;
+import java.util.Date;
+import java.util.Locale;
+
+/**
+ * This class represents an entry in a Tar archive. It consists
+ * of the entry's header, as well as the entry's File. Entries
+ * can be instantiated in one of three ways, depending on how
+ * they are to be used.
+ * <p>
+ * TarEntries that are created from the header bytes read from
+ * an archive are instantiated with the TarEntry( byte[] )
+ * constructor. These entries will be used when extracting from
+ * or listing the contents of an archive. These entries have their
+ * header filled in using the header bytes. They also set the File
+ * to null, since they reference an archive entry not a file.
+ * <p>
+ * TarEntries that are created from Files that are to be written
+ * into an archive are instantiated with the TarEntry( File )
+ * constructor. These entries have their header filled in using
+ * the File's information. They also keep a reference to the File
+ * for convenience when writing entries.
+ * <p>
+ * Finally, TarEntries can be constructed from nothing but a name.
+ * This allows the programmer to construct the entry by hand, for
+ * instance when only an InputStream is available for writing to
+ * the archive, and the header information is constructed from
+ * other information. In this case the header fields are set to
+ * defaults and the File is set to null.
+ *
+ * <p>
+ * The C structure for a Tar Entry's header is:
+ * <pre>
+ * struct header {
+ * char name[NAMSIZ];
+ * char mode[8];
+ * char uid[8];
+ * char gid[8];
+ * char size[12];
+ * char mtime[12];
+ * char chksum[8];
+ * char linkflag;
+ * char linkname[NAMSIZ];
+ * char magic[8];
+ * char uname[TUNMLEN];
+ * char gname[TGNMLEN];
+ * char devmajor[8];
+ * char devminor[8];
+ * } header;
+ * </pre>
+ *
+ */
+
+public class TarEntry implements TarConstants {
+    /** The entry's name. */
+    private StringBuffer name;
+
+    /** The entry's permission mode. */
+    private int mode;
+
+    /** The entry's user id. */
+    private int userId;
+
+    /** The entry's group id. */
+    private int groupId;
+
+    /** The entry's size. */
+    private long size;
+
+    /** The entry's modification time. */
+    private long modTime;
+
+    /** The entry's link flag. */
+    private byte linkFlag;
+
+    /** The entry's link name. */
+    private StringBuffer linkName;
+
+    /** The entry's magic tag. */
+    private StringBuffer magic;
+
+    /** The entry's user name. */
+    private StringBuffer userName;
+
+    /** The entry's group name. */
+    private StringBuffer groupName;
+
+    /** The entry's major device number. */
+    private int devMajor;
+
+    /** The entry's minor device number. */
+    private int devMinor;
+
+    /** The entry's file reference */
+    private File file;
+
+    /** Maximum length of a user's name in the tar file */
+    public static final int MAX_NAMELEN = 31;
+
+    /** Default permissions bits for directories */
+    public static final int DEFAULT_DIR_MODE = 040755;
+
+    /** Default permissions bits for files */
+    public static final int DEFAULT_FILE_MODE = 0100644;
+
+    /** Convert millis to seconds */
+    public static final int MILLIS_PER_SECOND = 1000;
+
+    /**
+     * Construct an empty entry and prepares the header values.
+     */
+    private TarEntry () {
+        this.magic = new StringBuffer(TMAGIC);
+        this.name = new StringBuffer();
+        this.linkName = new StringBuffer();
+
+        String user = System.getProperty("user.name", "");
+
+        if (user.length() > MAX_NAMELEN) {
+            user = user.substring(0, MAX_NAMELEN);
+        }
+
+        this.userId = 0;
+        this.groupId = 0;
+        this.userName = new StringBuffer(user);
+        this.groupName = new StringBuffer("");
+        this.file = null;
+    }
+
+    /**
+     * Construct an entry with only a name. This allows the programmer
+     * to construct the entry's header "by hand". File is set to null.
+     *
+     * @param name the entry name
+     */
+    public TarEntry(String name) {
+        this(name, false);
+    }
+
+    /**
+     * Construct an entry with only a name. This allows the programmer
+     * to construct the entry's header "by hand". File is set to null.
+     *
+     * @param name the entry name
+     * @param preserveLeadingSlashes whether to allow leading slashes
+     * in the name.
+     */
+    public TarEntry(String name, boolean preserveLeadingSlashes) {
+        this();
+
+        name = normalizeFileName(name, preserveLeadingSlashes);
+        boolean isDir = name.endsWith("/");
+
+        this.devMajor = 0;
+        this.devMinor = 0;
+        this.name = new StringBuffer(name);
+        this.mode = isDir ? DEFAULT_DIR_MODE : DEFAULT_FILE_MODE;
+        this.linkFlag = isDir ? LF_DIR : LF_NORMAL;
+        this.userId = 0;
+        this.groupId = 0;
+        this.size = 0;
+        this.modTime = (new Date()).getTime() / MILLIS_PER_SECOND;
+        this.linkName = new StringBuffer("");
+        this.userName = new StringBuffer("");
+        this.groupName = new StringBuffer("");
+        this.devMajor = 0;
+        this.devMinor = 0;
+
+    }
+
+    /**
+     * Construct an entry with a name and a link flag.
+     *
+     * @param name the entry name
+     * @param linkFlag the entry link flag.
+     */
+    public TarEntry(String name, byte linkFlag) {
+        this(name);
+        this.linkFlag = linkFlag;
+        if (linkFlag == LF_GNUTYPE_LONGNAME) {
+            magic = new StringBuffer(GNU_TMAGIC);
+        }
+    }
+
+    /**
+     * Construct an entry for a file. File is set to file, and the
+     * header is constructed from information from the file.
+     *
+     * @param file The file that the entry represents.
+     */
+    public TarEntry(File file) {
+        this();
+
+        this.file = file;
+
+        String fileName = normalizeFileName(file.getPath(), false);
+        this.linkName = new StringBuffer("");
+        this.name = new StringBuffer(fileName);
+
+        if (file.isDirectory()) {
+            this.mode = DEFAULT_DIR_MODE;
+            this.linkFlag = LF_DIR;
+
+            int nameLength = name.length();
+            if (nameLength == 0 || name.charAt(nameLength - 1) != '/') {
+                this.name.append("/");
+            }
+            this.size = 0;
+        } else {
+            this.mode = DEFAULT_FILE_MODE;
+            this.linkFlag = LF_NORMAL;
+            this.size = file.length();
+        }
+
+        this.modTime = file.lastModified() / MILLIS_PER_SECOND;
+        this.devMajor = 0;
+        this.devMinor = 0;
+    }
+
+    /**
+     * Construct an entry from an archive's header bytes. File is set
+     * to null.
+     *
+     * @param headerBuf The header bytes from a tar archive entry.
+     */
+    public TarEntry(byte[] headerBuf) {
+        this();
+        parseTarHeader(headerBuf);
+    }
+
+    /**
+     * Determine if the two entries are equal. Equality is determined
+     * by the header names being equal.
+     *
+     * @param it Entry to be checked for equality.
+     * @return True if the entries are equal.
+     */
+    public boolean equals(TarEntry it) {
+        return getName().equals(it.getName());
+    }
+
+    /**
+     * Determine if the two entries are equal. Equality is determined
+     * by the header names being equal.
+     *
+     * @param it Entry to be checked for equality.
+     * @return True if the entries are equal.
+     */
+    public boolean equals(Object it) {
+        if (it == null || getClass() != it.getClass()) {
+            return false;
+        }
+        return equals((TarEntry) it);
+    }
+
+    /**
+     * Hashcodes are based on entry names.
+     *
+     * @return the entry hashcode
+     */
+    public int hashCode() {
+        return getName().hashCode();
+    }
+
+    /**
+     * Determine if the given entry is a descendant of this entry.
+     * Descendancy is determined by the name of the descendant
+     * starting with this entry's name.
+     *
+     * @param desc Entry to be checked as a descendent of this.
+     * @return True if entry is a descendant of this.
+     */
+    public boolean isDescendent(TarEntry desc) {
+        return desc.getName().startsWith(getName());
+    }
+
+    /**
+     * Get this entry's name.
+     *
+     * @return This entry's name.
+     */
+    public String getName() {
+        return name.toString();
+    }
+
+    /**
+     * Set this entry's name.
+     *
+     * @param name This entry's new name.
+     */
+    public void setName(String name) {
+        this.name = new StringBuffer(normalizeFileName(name, false));
+    }
+
+    /**
+     * Set the mode for this entry
+     *
+     * @param mode the mode for this entry
+     */
+    public void setMode(int mode) {
+        this.mode = mode;
+    }
+
+    /**
+     * Get this entry's link name.
+     *
+     * @return This entry's link name.
+     */
+    public String getLinkName() {
+        return linkName.toString();
+    }
+
+    /**
+     * Get this entry's user id.
+     *
+     * @return This entry's user id.
+     */
+    public int getUserId() {
+        return userId;
+    }
+
+    /**
+     * Set this entry's user id.
+     *
+     * @param userId This entry's new user id.
+     */
+    public void setUserId(int userId) {
+        this.userId = userId;
+    }
+
+    /**
+     * Get this entry's group id.
+     *
+     * @return This entry's group id.
+     */
+    public int getGroupId() {
+        return groupId;
+    }
+
+    /**
+     * Set this entry's group id.
+     *
+     * @param groupId This entry's new group id.
+     */
+    public void setGroupId(int groupId) {
+        this.groupId = groupId;
+    }
+
+    /**
+     * Get this entry's user name.
+     *
+     * @return This entry's user name.
+     */
+    public String getUserName() {
+        return userName.toString();
+    }
+
+    /**
+     * Set this entry's user name.
+     *
+     * @param userName This entry's new user name.
+     */
+    public void setUserName(String userName) {
+        this.userName = new StringBuffer(userName);
+    }
+
+    /**
+     * Get this entry's group name.
+     *
+     * @return This entry's group name.
+     */
+    public String getGroupName() {
+        return groupName.toString();
+    }
+
+    /**
+     * Set this entry's group name.
+     *
+     * @param groupName This entry's new group name.
+     */
+    public void setGroupName(String groupName) {
+        this.groupName = new StringBuffer(groupName);
+    }
+
+    /**
+     * Convenience method to set this entry's group and user ids.
+     *
+     * @param userId This entry's new user id.
+     * @param groupId This entry's new group id.
+     */
+    public void setIds(int userId, int groupId) {
+        setUserId(userId);
+        setGroupId(groupId);
+    }
+
+    /**
+     * Convenience method to set this entry's group and user names.
+     *
+     * @param userName This entry's new user name.
+     * @param groupName This entry's new group name.
+     */
+    public void setNames(String userName, String groupName) {
+        setUserName(userName);
+        setGroupName(groupName);
+    }
+
+    /**
+     * Set this entry's modification time. The parameter passed
+     * to this method is in "Java time".
+     *
+     * @param time This entry's new modification time.
+     */
+    public void setModTime(long time) {
+        modTime = time / MILLIS_PER_SECOND;
+    }
+
+    /**
+     * Set this entry's modification time.
+     *
+     * @param time This entry's new modification time.
+     */
+    public void setModTime(Date time) {
+        modTime = time.getTime() / MILLIS_PER_SECOND;
+    }
+
+    /**
+     * Set this entry's modification time.
+     *
+     * @return time This entry's new modification time.
+     */
+    public Date getModTime() {
+        return new Date(modTime * MILLIS_PER_SECOND);
+    }
+
+    /**
+     * Get this entry's file.
+     *
+     * @return This entry's file.
+     */
+    public File getFile() {
+        return file;
+    }
+
+    /**
+     * Get this entry's mode.
+     *
+     * @return This entry's mode.
+     */
+    public int getMode() {
+        return mode;
+    }
+
+    /**
+     * Get this entry's file size.
+     *
+     * @return This entry's file size.
+     */
+    public long getSize() {
+        return size;
+    }
+
+    /**
+     * Set this entry's file size.
+     *
+     * @param size This entry's new file size.
+     */
+    public void setSize(long size) {
+        this.size = size;
+    }
+
+
+    /**
+     * Indicate if this entry is a GNU long name block
+     *
+     * @return true if this is a long name extension provided by GNU tar
+     */
+    public boolean isGNULongNameEntry() {
+        return linkFlag == LF_GNUTYPE_LONGNAME
+                           && name.toString().equals(GNU_LONGLINK);
+    }
+
+    /**
+     * Return whether or not this entry represents a directory.
+     *
+     * @return True if this entry is a directory.
+     */
+    public boolean isDirectory() {
+        if (file != null) {
+            return file.isDirectory();
+        }
+
+        if (linkFlag == LF_DIR) {
+            return true;
+        }
+
+        if (getName().endsWith("/")) {
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * If this entry represents a file, and the file is a directory, return
+     * an array of TarEntries for this entry's children.
+     *
+     * @return An array of TarEntry's for this entry's children.
+     */
+    public TarEntry[] getDirectoryEntries() {
+        if (file == null || !file.isDirectory()) {
+            return new TarEntry[0];
+        }
+
+        String[]   list = file.list();
+        TarEntry[] result = new TarEntry[list.length];
+
+        for (int i = 0; i < list.length; ++i) {
+            result[i] = new TarEntry(new File(file, list[i]));
+        }
+
+        return result;
+    }
+
+    /**
+     * Write an entry's header information to a header buffer.
+     *
+     * @param outbuf The tar entry header buffer to fill in.
+     */
+    public void writeEntryHeader(byte[] outbuf) {
+        int offset = 0;
+
+        offset = TarUtils.getNameBytes(name, outbuf, offset, NAMELEN);
+        offset = TarUtils.getOctalBytes(mode, outbuf, offset, MODELEN);
+        offset = TarUtils.getOctalBytes(userId, outbuf, offset, UIDLEN);
+        offset = TarUtils.getOctalBytes(groupId, outbuf, offset, GIDLEN);
+        offset = TarUtils.getLongOctalBytes(size, outbuf, offset, SIZELEN);
+        offset = TarUtils.getLongOctalBytes(modTime, outbuf, offset, MODTIMELEN);
+
+        int csOffset = offset;
+
+        for (int c = 0; c < CHKSUMLEN; ++c) {
+            outbuf[offset++] = (byte) ' ';
+        }
+
+        outbuf[offset++] = linkFlag;
+        offset = TarUtils.getNameBytes(linkName, outbuf, offset, NAMELEN);
+        offset = TarUtils.getNameBytes(magic, outbuf, offset, MAGICLEN);
+        offset = TarUtils.getNameBytes(userName, outbuf, offset, UNAMELEN);
+        offset = TarUtils.getNameBytes(groupName, outbuf, offset, GNAMELEN);
+        offset = TarUtils.getOctalBytes(devMajor, outbuf, offset, DEVLEN);
+        offset = TarUtils.getOctalBytes(devMinor, outbuf, offset, DEVLEN);
+
+        while (offset < outbuf.length) {
+            outbuf[offset++] = 0;
+        }
+
+        long chk = TarUtils.computeCheckSum(outbuf);
+
+        TarUtils.getCheckSumOctalBytes(chk, outbuf, csOffset, CHKSUMLEN);
+    }
+
+    /**
+     * Parse an entry's header information from a header buffer.
+     *
+     * @param header The tar entry header buffer to get information from.
+     */
+    public void parseTarHeader(byte[] header) {
+        int offset = 0;
+
+        name = TarUtils.parseName(header, offset, NAMELEN);
+        offset += NAMELEN;
+        mode = (int) TarUtils.parseOctal(header, offset, MODELEN);
+        offset += MODELEN;
+        userId = (int) TarUtils.parseOctal(header, offset, UIDLEN);
+        offset += UIDLEN;
+        groupId = (int) TarUtils.parseOctal(header, offset, GIDLEN);
+        offset += GIDLEN;
+        size = TarUtils.parseOctal(header, offset, SIZELEN);
+        offset += SIZELEN;
+        modTime = TarUtils.parseOctal(header, offset, MODTIMELEN);
+        offset += MODTIMELEN;
+        offset += CHKSUMLEN;
+        linkFlag = header[offset++];
+        linkName = TarUtils.parseName(header, offset, NAMELEN);
+        offset += NAMELEN;
+        magic = TarUtils.parseName(header, offset, MAGICLEN);
+        offset += MAGICLEN;
+        userName = TarUtils.parseName(header, offset, UNAMELEN);
+        offset += UNAMELEN;
+        groupName = TarUtils.parseName(header, offset, GNAMELEN);
+        offset += GNAMELEN;
+        devMajor = (int) TarUtils.parseOctal(header, offset, DEVLEN);
+        offset += DEVLEN;
+        devMinor = (int) TarUtils.parseOctal(header, offset, DEVLEN);
+    }
+
+    /**
+     * Strips Windows' drive letter as well as any leading slashes,
+     * turns path separators into forward slahes.
+     */
+    private static String normalizeFileName(String fileName,
+                                            boolean preserveLeadingSlashes) {
+        String osname = System.getProperty("os.name").toLowerCase(Locale.ENGLISH);
+
+        if (osname != null) {
+
+            // Strip off drive letters!
+            // REVIEW Would a better check be "(File.separator == '\')"?
+
+            if (osname.startsWith("windows")) {
+                if (fileName.length() > 2) {
+                    char ch1 = fileName.charAt(0);
+                    char ch2 = fileName.charAt(1);
+
+                    if (ch2 == ':'
+                        && ((ch1 >= 'a' && ch1 <= 'z')
+                            || (ch1 >= 'A' && ch1 <= 'Z'))) {
+                        fileName = fileName.substring(2);
+                    }
+                }
+            } else if (osname.indexOf("netware") > -1) {
+                int colon = fileName.indexOf(':');
+                if (colon != -1) {
+                    fileName = fileName.substring(colon + 1);
+                }
+            }
+        }
+
+        fileName = fileName.replace(File.separatorChar, '/');
+
+        // No absolute pathnames
+        // Windows (and Posix?) paths can start with "\\NetworkDrive\",
+        // so we loop on starting /'s.
+        while (!preserveLeadingSlashes && fileName.startsWith("/")) {
+            fileName = fileName.substring(1);
+        }
+        return fileName;
+    }
+}



Mime
View raw message