From commits-return-17768-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Sat Jan 21 22:34:55 2012 Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0942A903B for ; Sat, 21 Jan 2012 22:34:55 +0000 (UTC) Received: (qmail 19468 invoked by uid 500); 21 Jan 2012 22:34:54 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 19417 invoked by uid 500); 21 Jan 2012 22:34:54 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 19410 invoked by uid 99); 21 Jan 2012 22:34:54 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 21 Jan 2012 22:34:54 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 21 Jan 2012 22:34:50 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id F25FB23889B8 for ; Sat, 21 Jan 2012 22:34:29 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1234433 [1/2] - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-broker/src/test/scala/org/apache/active... Date: Sat, 21 Jan 2012 22:34:29 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120121223429.F25FB23889B8@eris.apache.org> Author: chirino Date: Sat Jan 21 22:34:28 2012 New Revision: 1234433 URL: http://svn.apache.org/viewvc?rev=1234433&view=rev Log: Switch from using zip to tar for the store export/imports files since zip files have a 4GB size limit. Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarBuffer.java activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarConstants.java activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarEntry.java activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarInputStream.java activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarOutputStream.java activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarUtils.java Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1234433&r1=1234432&r2=1234433&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original) +++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Sat Jan 21 22:34:28 2012 @@ -20,19 +20,12 @@ import dto.BDBStoreDTO import java.{lang=>jl} import java.{util=>ju} -import java.util.concurrent.atomic.AtomicInteger import collection.mutable.ListBuffer import org.apache.activemq.apollo.broker.store._ import org.apache.activemq.apollo.util._ -import java.io.{EOFException, InputStream, OutputStream} -import org.fusesource.hawtbuf.proto.{MessageBuffer, PBMessageFactory} -import org.apache.activemq.apollo.util.Log._ -import scala.Some -import java.sql.ClientInfoStatus +import java.io.{InputStream, OutputStream} import com.sleepycat.je._ -import javax.management.remote.rmi._RMIConnection_Stub -import org.apache.activemq.apollo.util.FileSupport._ -import org.fusesource.hawtbuf.{AsciiBuffer, Buffer} +import org.fusesource.hawtbuf.Buffer object BDBClient extends Log /** @@ -549,56 +542,41 @@ class BDBClient(store: BDBStore) { } } - def export_pb(streams:StreamManager[OutputStream]):Option[String] = { + def export_data(os:OutputStream):Option[String] = { try { - streams.using_version_stream{ stream=> - new AsciiBuffer("1").writeTo(stream) - } + val manager = ExportStreamManager(os, 1) with_ctx() { ctx=> import ctx._ - import PBSupport._ - streams.using_map_stream { stream => - map_db.cursor(tx) { (key,value) => - val record = new MapEntryPB.Bean - record.setKey(key) - record.setValue(value) - record.freeze().writeFramed(stream) - true - } + messages_db.cursor(tx) { (_, value) => + val record = MessagePB.FACTORY.parseUnframed(value.getData) + manager.store_message(record) + true } - streams.using_queue_stream { queue_stream => - queues_db.cursor(tx) { (_, value) => - val record:QueueRecord = value - record.freeze.writeFramed(queue_stream) - true - } + entries_db.cursor(tx) { (key, value) => + val record = QueueEntryPB.FACTORY.parseUnframed(value.getData) + manager.store_queue_entry(record) + true } - streams.using_message_stream { message_stream=> - messages_db.cursor(tx) { (_, data) => - import PBSupport._ - val pb = MessagePB.FACTORY.parseUnframed(data.getData) - pb.writeFramed(message_stream) - if( pb.hasDirectSize ) { - val buffer_cp_buffer = direct_buffer_allocator.slice(pb.getDirectOffset, pb.getDirectSize) - buffer_cp_buffer.read(message_stream) - } - true - } + queues_db.cursor(tx) { (_, value) => + val record = QueuePB.FACTORY.parseUnframed(value) + manager.store_queue(record) + true } - streams.using_queue_entry_stream { queue_entry_stream=> - entries_db.cursor(tx) { (key, value) => - val record:QueueEntryRecord = value - record.freeze.writeFramed(queue_entry_stream) - true - } + map_db.cursor(tx) { (key,value) => + val record = new MapEntryPB.Bean + record.setKey(key) + record.setValue(value) + manager.store_map_entry(record) + true } - } + + manager.finish None } catch { case x:Exception=> @@ -606,75 +584,42 @@ class BDBClient(store: BDBStore) { } } - def import_pb(streams:StreamManager[InputStream]):Option[String] = { + def import_data(is:InputStream):Option[String] = { try { - streams.using_version_stream { - stream => - var ver = read_text(stream).toInt - if (ver != 1) { - return Some("Cannot import from an export file at version: "+ver) - } + val manager = ImportStreamManager(is) + if(manager.version!=1) { + return Some("Cannot import from an export file of version: "+manager.version) } - } catch { - case e => return Some("Could not determine export format version: "+e) - } - try { - purge - def foreach[Buffer] (stream:InputStream, fact:PBMessageFactory[_,_])(func: (Buffer)=>Unit):Unit = { - var done = false - do { - try { - func(fact.parseFramed(stream).asInstanceOf[Buffer]) - } catch { - case x:EOFException => - done = true - } - } while( !done ) - } + purge with_ctx() { ctx=> import ctx._ - import PBSupport._ - streams.using_queue_stream { queue_stream=> - foreach[QueuePB.Buffer](queue_stream, QueuePB.FACTORY) { pb=> - val record:QueueRecord = pb - queues_db.put(tx, record.key, record) - } - } - - streams.using_map_stream { stream=> - foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb => - map_db.put(tx, pb.getKey, pb.getValue) - } - } + while(manager.getNext match { - streams.using_message_stream { message_stream=> - foreach[MessagePB.Buffer](message_stream, MessagePB.FACTORY) { pb=> + case record:MessagePB.Buffer => + messages_db.put(tx, record.getMessageKey, record.toUnframedBuffer) + true - var record:MessagePB.Buffer = pb - if( record.hasDirectSize ) { - val cp = record.copy - val buffer_cp_buffer = direct_buffer_allocator.alloc(cp.getDirectSize) - cp.setDirectOffset(buffer_cp_buffer.offset) - buffer_cp_buffer.write(message_stream) - lobs_db.put(tx, pb.getMessageKey, (buffer_cp_buffer.offset, buffer_cp_buffer.size)) - record = cp.freeze - } + case record:QueueEntryPB.Buffer => + entries_db.put(tx, (record.getQueueKey, record.getQueueSeq), record.toUnframedBuffer) + add_and_get(message_refs_db, record.getMessageKey, 1, tx) + true - messages_db.put(tx, record.getMessageKey, record) - } - } + case record:QueuePB.Buffer => + queues_db.put(tx, record.getKey, record.toUnframedBuffer) + true - streams.using_queue_entry_stream { queue_entry_stream=> - foreach[QueueEntryPB.Buffer](queue_entry_stream, QueueEntryPB.FACTORY) { pb=> - val record:QueueEntryRecord = pb + case record:MapEntryPB.Buffer => + map_db.put(tx, record.getKey, record.getValue) + true - entries_db.put(tx, (record.queue_key, record.entry_seq), record) - add_and_get(message_refs_db, record.message_key, 1, tx) - } + case null => + false + }) { // keep looping } + } None Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1234433&r1=1234432&r2=1234433&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala (original) +++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala Sat Jan 21 22:34:28 2012 @@ -228,16 +228,15 @@ class BDBStore(var config:BDBStoreDTO) e * Exports the contents of the store to the provided streams. Each stream should contain * a list of framed protobuf objects with the corresponding object types. */ - def export_pb(streams:StreamManager[OutputStream], cb:(Option[String])=>Unit) = write_executor { - cb(client.export_pb(streams)) + def export_data(os:OutputStream, cb:(Option[String])=>Unit) = write_executor { + cb(client.export_data(os)) } /** * Imports a previously exported set of streams. This deletes any previous data * in the store. */ - def import_pb(streams:StreamManager[InputStream], cb:(Option[String])=>Unit) = write_executor { - cb(client.import_pb(streams)) + def import_data(is:InputStream, cb:(Option[String])=>Unit) = write_executor { + cb(client.import_data(is)) } - } Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1234433&r1=1234432&r2=1234433&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala Sat Jan 21 22:34:28 2012 @@ -48,14 +48,14 @@ object PBSupport { rc } - def encode_message_record(out: OutputStream, v: MessageRecord) = to_pb(v).freeze.writeUnframed(out) - def decode_message_record(in: InputStream):MessageRecord = MessagePB.FACTORY.parseUnframed(in) + def encode_message_record(out: OutputStream, v: MessageRecord) = to_pb(v).freeze.writeFramed(out) + def decode_message_record(in: InputStream):MessageRecord = MessagePB.FACTORY.parseFramed(in) - implicit def encode_message_record(v: MessageRecord):Array[Byte] = to_pb(v).freeze.toUnframedByteArray - implicit def decode_message_record(data: Array[Byte]):MessageRecord = MessagePB.FACTORY.parseUnframed(data) + implicit def encode_message_record(v: MessageRecord):Array[Byte] = to_pb(v).freeze.toFramedByteArray + implicit def decode_message_record(data: Array[Byte]):MessageRecord = MessagePB.FACTORY.parseFramed(data) - implicit def encode_message_record_buffer(v: MessageRecord) = to_pb(v).freeze.toUnframedBuffer - implicit def decode_message_record_buffer(data: Buffer):MessageRecord = MessagePB.FACTORY.parseUnframed(data) + implicit def encode_message_record_buffer(v: MessageRecord) = to_pb(v).freeze.toFramedBuffer + implicit def decode_message_record_buffer(data: Buffer):MessageRecord = MessagePB.FACTORY.parseFramed(data) implicit def to_pb(v: QueueRecord):QueuePB.Bean = { @@ -70,14 +70,14 @@ object PBSupport { QueueRecord(pb.getKey, pb.getBindingKind, pb.getBindingData) } - def encode_queue_record(out: OutputStream, v: QueueRecord) = to_pb(v).freeze.writeUnframed(out) - def decode_queue_record(in: InputStream):QueueRecord = QueuePB.FACTORY.parseUnframed(in) + def encode_queue_record(out: OutputStream, v: QueueRecord) = to_pb(v).freeze.writeFramed(out) + def decode_queue_record(in: InputStream):QueueRecord = QueuePB.FACTORY.parseFramed(in) - implicit def encode_queue_record(v: QueueRecord) = to_pb(v).freeze.toUnframedByteArray - implicit def decode_queue_record(data: Array[Byte]):QueueRecord = QueuePB.FACTORY.parseUnframed(data) + implicit def encode_queue_record(v: QueueRecord) = to_pb(v).freeze.toFramedByteArray + implicit def decode_queue_record(data: Array[Byte]):QueueRecord = QueuePB.FACTORY.parseFramed(data) - implicit def encode_queue_record_buffer(v: QueueRecord) = to_pb(v).freeze.toUnframedBuffer - implicit def decode_queue_record_buffer(data: Buffer):QueueRecord = QueuePB.FACTORY.parseUnframed(data) + implicit def encode_queue_record_buffer(v: QueueRecord) = to_pb(v).freeze.toFramedBuffer + implicit def decode_queue_record_buffer(data: Buffer):QueueRecord = QueuePB.FACTORY.parseFramed(data) implicit def to_pb(v: QueueEntryRecord):QueueEntryPB.Bean = { val pb = new QueueEntryPB.Bean @@ -105,13 +105,13 @@ object PBSupport { rc } - def encode_queue_entry_record(out: OutputStream, v: QueueEntryRecord) = to_pb(v).freeze.writeUnframed(out) - def decode_queue_entry_record(in: InputStream):QueueEntryRecord = QueueEntryPB.FACTORY.parseUnframed(in) + def encode_queue_entry_record(out: OutputStream, v: QueueEntryRecord) = to_pb(v).freeze.writeFramed(out) + def decode_queue_entry_record(in: InputStream):QueueEntryRecord = QueueEntryPB.FACTORY.parseFramed(in) - implicit def encode_queue_entry_record(v: QueueEntryRecord) = to_pb(v).freeze.toUnframedByteArray - implicit def decode_queue_entry_record(data: Array[Byte]):QueueEntryRecord = QueueEntryPB.FACTORY.parseUnframed(data) + implicit def encode_queue_entry_record(v: QueueEntryRecord) = to_pb(v).freeze.toFramedByteArray + implicit def decode_queue_entry_record(data: Array[Byte]):QueueEntryRecord = QueueEntryPB.FACTORY.parseFramed(data) - implicit def encode_queue_entry_record_buffer(v: QueueEntryRecord) = to_pb(v).freeze.toUnframedBuffer - implicit def decode_queue_entry_record_buffer(data: Buffer):QueueEntryRecord = QueueEntryPB.FACTORY.parseUnframed(data) + implicit def encode_queue_entry_record_buffer(v: QueueEntryRecord) = to_pb(v).freeze.toFramedBuffer + implicit def decode_queue_entry_record_buffer(data: Buffer):QueueEntryRecord = QueueEntryPB.FACTORY.parseFramed(data) } \ No newline at end of file Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1234433&r1=1234432&r2=1234433&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala Sat Jan 21 22:34:28 2012 @@ -18,48 +18,88 @@ package org.apache.activemq.apollo.broke */ import org.apache.activemq.apollo.dto.StoreStatusDTO import org.apache.activemq.apollo.util._ -import java.io.{InputStream, OutputStream} import java.util.concurrent.atomic.AtomicReference -import org.fusesource.hawtbuf.Buffer -import java.util.zip.{ZipFile, ZipEntry, ZipOutputStream} -import FileSupport._ - -trait StreamManager[A] { - def using_version_stream(func: (A)=>Unit) - def using_map_stream(func: (A)=>Unit) - def using_queue_stream(func: (A)=>Unit) - def using_message_stream(func: (A)=>Unit) - def using_queue_entry_stream(func: (A)=>Unit) -} +import org.apache.activemq.apollo.util.tar._ +import java.util.zip.{GZIPInputStream, GZIPOutputStream} +import org.fusesource.hawtbuf.{AsciiBuffer, Buffer} +import java.io._ +import org.apache.activemq.apollo.util.FileSupport._ +import org.fusesource.hawtbuf.proto.MessageBuffer + +case class ExportStreamManager(target:OutputStream, version:Int) { + val stream = new TarOutputStream(new GZIPOutputStream(target)) + + var seq:Long = 0; + + def finish = stream.close() + + private def store(ext:String, value:Buffer) = { + var entry = new TarEntry(seq.toString + "." + ext) + seq += 1 + entry.setSize(value.length()) + stream.putNextEntry(entry); + value.writeTo(stream) + stream.closeEntry(); + } + + private def store(ext:String, value:MessageBuffer[_,_]) = { + var entry = new TarEntry(seq.toString + "." + ext) + seq += 1 + entry.setSize(value.serializedSizeFramed()) + stream.putNextEntry(entry); + value.writeFramed(stream) + stream.closeEntry(); + } + + store("ver", new AsciiBuffer(version.toString)) + + def store_queue(value:QueuePB.Getter) = { + store("que", value.freeze()) + } + def store_queue_entry(value:QueueEntryPB.Getter) = { + store("qen", value.freeze()) + } + def store_message(value:MessagePB.Getter) = { + store("msg", value.freeze()) + } + def store_map_entry(value:MapEntryPB.Getter) = { + store("map", value.freeze()) + } -case class ZipOputputStreamManager(out:ZipOutputStream) extends StreamManager[OutputStream]() { - def entry(name:String, func: (OutputStream) => Unit) = { - out.putNextEntry(new ZipEntry(name)); - func(out) - out.closeEntry(); - } - def using_version_stream(func: (OutputStream) => Unit) = entry("version.txt", func) - def using_queue_stream(func: (OutputStream) => Unit) = entry("queues.dat", func) - def using_queue_entry_stream(func: (OutputStream) => Unit) = entry("queue_entries.dat", func) - def using_message_stream(func: (OutputStream) => Unit) = entry("messages.dat", func) - def using_map_stream(func: (OutputStream) => Unit) = entry("map.dat", func) } -case class ZipInputStreamManager(zip:ZipFile) extends StreamManager[InputStream]() { - def entry(name:String, func: (InputStream) => Unit) = { - val entry = zip.getEntry(name) - if(entry == null) { - sys.error("Invalid data file, zip entry not found: "+name); +case class ImportStreamManager(source:InputStream) { + + val stream = new TarInputStream(new GZIPInputStream(source)) + + val version = try { + var entry = stream.getNextEntry + if( entry.getName != "0.ver" ) { + throw new Exception("0.ver entry missing") } - using(zip.getInputStream(entry)) { is=> - func(is) + read_text(stream).toInt + } catch { + case e => new IOException("Could not determine export format version: "+e) + } + + def getNext:AnyRef = { + var entry = stream.getNextEntry + if( entry==null ) { + return null; + } + + if( entry.getName.endsWith(".qen") ) { + QueueEntryPB.FACTORY.parseFramed(stream) + } else if( entry.getName.endsWith(".msg") ) { + MessagePB.FACTORY.parseFramed(stream) + } else if( entry.getName.endsWith(".que") ) { + QueuePB.FACTORY.parseFramed(stream) + } else if( entry.getName.endsWith(".map") ) { + MapEntryPB.FACTORY.parseFramed(stream) + } else { + throw new Exception("Unknown entry: "+entry.getName) } } - def using_version_stream(func: (InputStream) => Unit) = entry("version.txt", func) - def using_queue_stream(func: (InputStream) => Unit) = entry("queues.dat", func) - def using_queue_entry_stream(func: (InputStream) => Unit) = entry("queue_entries.dat", func) - def using_message_stream(func: (InputStream) => Unit) = entry("messages.dat", func) - def using_map_stream(func: (InputStream) => Unit) = entry("map.dat", func) } @@ -146,14 +186,12 @@ trait Store extends ServiceTrait { def load_message(messageKey:Long, locator:AtomicReference[Object])(callback:(Option[MessageRecord])=>Unit ) /** - * Exports the contents of the store to the provided streams. Each stream should contain - * a list of framed protobuf objects with the corresponding object types. + * Exports the contents of the store to the provided stream. */ - def export_pb(streams:StreamManager[OutputStream], cb:(Option[String])=>Unit):Unit + def export_data(os:OutputStream, cb:(Option[String])=>Unit):Unit /** - * Imports a previously exported set of streams. This deletes any previous data - * in the store. + * Imports a previous export from the input stream. */ - def import_pb(streams:StreamManager[InputStream], cb:(Option[String])=>Unit):Unit + def import_data(is:InputStream, cb:(Option[String])=>Unit):Unit } \ No newline at end of file Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1234433&r1=1234432&r2=1234433&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Sat Jan 21 22:34:28 2012 @@ -18,13 +18,12 @@ package org.apache.activemq.apollo.broke */ import org.fusesource.hawtbuf.AsciiBuffer._ import org.fusesource.hawtdispatch.TaskTracker -import java.util.concurrent.{TimeUnit, CountDownLatch} +import java.util.concurrent.TimeUnit import collection.mutable.ListBuffer import org.apache.activemq.apollo.util.{LoggingTracker, FunSuiteSupport, LongCounter} import org.scalatest.BeforeAndAfterEach import org.apache.activemq.apollo.util.FileSupport._ import java.util.concurrent.atomic.AtomicReference -import java.util.zip.{ZipFile, ZipOutputStream, ZipEntry} import java.io._ import org.apache.activemq.apollo.util.sync_cb @@ -144,15 +143,13 @@ abstract class StoreFunSuiteSupport exte rc.get.buffer } - val file = test_data_dir / "export.zip" + val file = test_data_dir / "export.tgz" file.getParentFile.mkdirs() - using( new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(file)))) { out=> - val manager = ZipOputputStreamManager(out) - + using( new BufferedOutputStream(new FileOutputStream(file))) { os => // Export the data... expect(None) { sync_cb[Option[String]] { cb => - store.export_pb(manager, cb) + store.export_data(os, cb) } } } @@ -166,16 +163,12 @@ abstract class StoreFunSuiteSupport exte } // Import the data.. - val zip = new ZipFile(file) - try { - val manager = ZipInputStreamManager(zip) + using(new BufferedInputStream(new FileInputStream(file))) { is => expect(None) { sync_cb[Option[String]] { cb => - store.import_pb(manager, cb) + store.import_data(is, cb) } } - } finally { - zip.close } // The data should be there now again.. Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala?rev=1234433&r1=1234432&r2=1234433&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala (original) +++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala Sat Jan 21 22:34:28 2012 @@ -25,8 +25,8 @@ import org.apache.felix.service.command. import org.apache.activemq.apollo.broker.ConfigStore import java.io._ import java.util.concurrent.CountDownLatch -import org.apache.activemq.apollo.broker.store.ZipOputputStreamManager._ -import org.apache.activemq.apollo.broker.store.{ZipOputputStreamManager, StreamManager, StoreFactory} +import org.apache.activemq.apollo.broker.store.ExportStreamManager._ +import org.apache.activemq.apollo.broker.store.{ExportStreamManager, StreamManager, StoreFactory} /** * The apollo stop command @@ -42,7 +42,7 @@ class StoreExport extends Action { @option(name = "--virtual-host", description = "The id of the virtual host to export, if not specified, the default virtual host is selected.") var host: String = _ - @argument(name = "file", description = "The zip file to hold the exported data", index=0, required=true) + @argument(name = "file", description = "The compressed tar file to hold the exported data", index=0, required=true) var file:File = _ def execute(session: CommandSession):AnyRef = { @@ -79,15 +79,13 @@ class StoreExport extends Action { error("Could not create the store.") } + session.getConsole.println("Starting store: "+store) ServiceControl.start(store, "store startup") - session.getConsole.println("Exporting... (this might take a while)") - using( new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(file)))) { out=> - out.setMethod(ZipEntry.DEFLATED) - out.setLevel(9) - val manager = ZipOputputStreamManager(out) + session.getConsole.println("Exporting... (this might take a while)") + using( ExportStreamManager(new BufferedOutputStream(new FileOutputStream(file)))) { manager=> sync_cb[Option[String]] { cb => - store.export_pb(manager, cb) + store.export_data(manager, cb) }.foreach(error _) } ServiceControl.stop(store, "store stop"); Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala?rev=1234433&r1=1234432&r2=1234433&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala (original) +++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala Sat Jan 21 22:34:28 2012 @@ -22,11 +22,11 @@ import org.apache.activemq.apollo.dto.Vi import org.apache.activemq.apollo.util._ import scala.util.continuations._ import java.util.zip.ZipFile -import java.io.{InputStream, File} import org.apache.felix.service.command.CommandSession import org.apache.activemq.apollo.broker.ConfigStore -import org.apache.activemq.apollo.broker.store.ZipInputStreamManager._ -import org.apache.activemq.apollo.broker.store.{ZipInputStreamManager, StreamManager, StoreFactory} +import org.apache.activemq.apollo.broker.store.ImportStreamManager._ +import org.apache.activemq.apollo.broker.store.{ImportStreamManager, StreamManager, StoreFactory} +import java.io.{FileInputStream, BufferedInputStream, InputStream, File} /** @@ -43,7 +43,7 @@ class StoreImport extends Action { @option(name = "--virtual-host", description = "The id of the virtual host to import into, if not specified, the default virtual host is selected.") var host: String = _ - @argument(name = "file", description = "The zip file the contains that data for the import", index=0, required=true) + @argument(name = "file", description = "The compressed tar file the contains that data for the import", index=0, required=true) var file:File = _ def execute(session: CommandSession):AnyRef = { @@ -80,17 +80,14 @@ class StoreImport extends Action { error("Could not create the store.") } + session.getConsole.println("Starting store: "+store) ServiceControl.start(store, "store startup") session.getConsole.println("Importing: "+file) - val zip = new ZipFile(file) - try { - val manager = ZipInputStreamManager(zip) + using( ImportStreamManager(new BufferedInputStream(new FileInputStream(file)))) { manager => sync_cb[Option[String]] { cb => - store.import_pb(manager, cb) + store.import_data(manager, cb) }.foreach(error _) - } finally { - zip.close } ServiceControl.stop(store, "store stop"); Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1234433&r1=1234432&r2=1234433&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original) +++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Sat Jan 21 22:34:28 2012 @@ -346,7 +346,7 @@ class LevelDBClient(store: LevelDBStore) case LOG_ADD_MESSAGE => case LOG_ADD_QUEUE_ENTRY => replay_operations+=1 - val record = QueueEntryPB.FACTORY.parseUnframed(data) + val record = QueueEntryPB.FACTORY.parseFramed(data) val pos = decode_vlong(record.getMessageLocator) index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), data) pos.foreach(log_ref_increment(_)) @@ -354,7 +354,7 @@ class LevelDBClient(store: LevelDBStore) case LOG_REMOVE_QUEUE_ENTRY => replay_operations+=1 index.get(data, new ReadOptions).foreach { value=> - val record = QueueEntryPB.FACTORY.parseUnframed(value) + val record = QueueEntryPB.FACTORY.parseFramed(value) val pos = decode_vlong(record.getMessageLocator) pos.foreach(log_ref_decrement(_)) index.delete(data) @@ -362,7 +362,7 @@ class LevelDBClient(store: LevelDBStore) case LOG_ADD_QUEUE => replay_operations+=1 - val record = QueuePB.FACTORY.parseUnframed(data) + val record = QueuePB.FACTORY.parseFramed(data) index.put(encode_key(queue_prefix, record.getKey), data) case LOG_REMOVE_QUEUE => @@ -377,7 +377,7 @@ class LevelDBClient(store: LevelDBStore) // Figure out what log file that message entry was in so we can, // decrement the log file reference. - val record = QueueEntryPB.FACTORY.parseUnframed(value) + val record = QueueEntryPB.FACTORY.parseFramed(value) val pos = decode_vlong(record.getMessageLocator) log_ref_decrement(pos) true @@ -385,7 +385,7 @@ class LevelDBClient(store: LevelDBStore) case LOG_MAP_ENTRY => replay_operations+=1 - val entry = MapEntryPB.FACTORY.parseUnframed(data) + val entry = MapEntryPB.FACTORY.parseFramed(data) if (entry.getValue == null) { index.delete(encode_key(map_prefix, entry.getKey)) } else { @@ -421,7 +421,7 @@ class LevelDBClient(store: LevelDBStore) index.cursor_prefixed(queue_entry_prefix_array) { (key, value)=> try { val (_, queue_key, seq_key) = decode_long_long_key(key) - val record = QueueEntryPB.FACTORY.parseUnframed(value) + val record = QueueEntryPB.FACTORY.parseFramed(value) val (pos, len) = decode_locator(record.getMessageLocator) if (record.getQueueKey != queue_key) { throw new IOException("key missmatch") @@ -448,7 +448,7 @@ class LevelDBClient(store: LevelDBStore) index.cursor_prefixed(queue_prefix_array) { (key, value)=> try { val (_, queue_key) = decode_long_key(key) - val record = QueuePB.FACTORY.parseUnframed(value) + val record = QueuePB.FACTORY.parseFramed(value) if (record.getKey != queue_key) { throw new IOException("key missmatch") } @@ -469,7 +469,7 @@ class LevelDBClient(store: LevelDBStore) trace("invalid queue entry record: %s, error: queue key does not exits %s", new Buffer(key), queue_key) fixed_records += 1 index.delete(key) - val record = QueueEntryPB.FACTORY.parseUnframed(value) + val record = QueueEntryPB.FACTORY.parseFramed(value) val pos = decode_vlong(record.getMessageLocator) log.log_info(pos).foreach { log_info => actual_log_refs.get(log_info.position).foreach { counter => @@ -718,7 +718,7 @@ class LevelDBClient(store: LevelDBStore) // Figure out what log file that message entry was in so we can, // decrement the log file reference. - val record = QueueEntryPB.FACTORY.parseUnframed(value) + val record = QueueEntryPB.FACTORY.parseFramed(value) val pos = decode_vlong(record.getMessageLocator) log_ref_decrement(pos) true @@ -745,7 +745,7 @@ class LevelDBClient(store: LevelDBStore) entry.setValue(value) batch.put(encode_key(map_prefix, key), value.toByteArray) } - appender.append(LOG_MAP_ENTRY, entry.freeze().toUnframedByteArray) + appender.append(LOG_MAP_ENTRY, entry.freeze().toFramedByteArray) } uow.actions.foreach { case (msg, action) => @@ -783,7 +783,7 @@ class LevelDBClient(store: LevelDBStore) val record = PBSupport.to_pb(entry) record.setMessageLocator(locator_buffer) - val encoded = record.freeze().toUnframedBuffer + val encoded = record.freeze().toFramedBuffer appender.append(LOG_ADD_QUEUE_ENTRY, encoded) batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded) @@ -906,7 +906,7 @@ class LevelDBClient(store: LevelDBStore) group.first_entry_seq = current_key } - val entry = QueueEntryPB.FACTORY.parseUnframed(value) + val entry = QueueEntryPB.FACTORY.parseFramed(value) val pos = decode_vlong(entry.getMessageLocator) group.last_entry_seq = current_key @@ -947,7 +947,7 @@ class LevelDBClient(store: LevelDBStore) val start = encode_key(queue_entry_prefix, queue_key, firstSeq) val end = encode_key(queue_entry_prefix, queue_key, lastSeq+1) index.cursor_range( start, end, ro ) { (key, value) => - val record = QueueEntryPB.FACTORY.parseUnframed(value) + val record = QueueEntryPB.FACTORY.parseFramed(value) val entry = PBSupport.from_pb(record) entry.message_locator = new AtomicReference[Object](decode_locator(record.getMessageLocator)) rc += entry @@ -1063,11 +1063,9 @@ class LevelDBClient(store: LevelDBStore) // } - def export_pb(streams:StreamManager[OutputStream]):Option[String] = { + def export_data(os:OutputStream):Option[String] = { try { - streams.using_version_stream{ stream=> - new AsciiBuffer("1").writeTo(stream) - } + val manager = ExportStreamManager(os, 1) retry_using_index { @@ -1078,68 +1076,62 @@ class LevelDBClient(store: LevelDBStore) } index.snapshot { snapshot=> - val ro = new ReadOptions - ro.snapshot(snapshot) - ro.verifyChecksums(verify_checksums) - ro.fillCache(false) + val nocache = new ReadOptions + nocache.snapshot(snapshot) + nocache.verifyChecksums(verify_checksums) + nocache.fillCache(false) - def write_framed(stream:OutputStream, value:Array[Byte]) = { - val helper = new AbstractVarIntSupport { - def readByte: Byte = throw new UnsupportedOperationException - def writeByte(value: Int) = stream.write(value) - } - helper.writeVarInt(value.length) - stream.write(value); + val cache = new ReadOptions + nocache.snapshot(snapshot) + nocache.verifyChecksums(false) + nocache.fillCache(false) + + // Build a temp table of all references messages by the queues + // Remember 2 queues could reference the same message. + index.cursor_prefixed(queue_entry_prefix_array, cache) { (_, value) => + val record = QueueEntryPB.FACTORY.parseFramed(value) + val (pos, len) = decode_locator(record.getMessageLocator) + index.put(encode_key(tmp_prefix, pos), encode_vlong(len)) true } - streams.using_map_stream { stream=> - index.cursor_prefixed(map_prefix_array, ro) { (key, value) => - val key_buffer = new Buffer(key) - key_buffer.moveHead(1) - val record = new MapEntryPB.Bean - record.setKey(key_buffer) - record.setValue(new Buffer(value)) - record.freeze().writeFramed(stream) - true + // Use the temp table to export all the referenced messages. Use + // the log position as the message key. + index.cursor_prefixed(Array(tmp_prefix)) { (key, value) => + val (_, pos) = decode_long_key(key) + val len = decode_vlong(value).toInt + log.read(pos, len).foreach { value => + // Set the message key to be the position in the log. + val record = MessagePB.FACTORY.parseFramed(value).copy + record.setMessageKey(pos) + manager.store_message(record) } + true } - streams.using_queue_stream { stream => - index.cursor_prefixed(queue_prefix_array) { (_, value) => - write_framed(stream, value) - } + // Now export the queue entries + index.cursor_prefixed(queue_entry_prefix_array, nocache) { (_, value) => + val record = QueueEntryPB.FACTORY.parseFramed(value).copy() + val (pos, len) = decode_locator(record.getMessageLocator) + record.setMessageKey(pos) + manager.store_queue_entry(record) + true } - // Figure out the active log locations.. - streams.using_queue_entry_stream { stream=> - index.cursor_prefixed(queue_entry_prefix_array, ro) { (_, value) => - val record = QueueEntryPB.FACTORY.parseUnframed(value).copy() - val (pos, len) = decode_locator(record.getMessageLocator) - record.setMessageKey(pos) - write_framed(stream, record.freeze().toUnframedByteArray) - index.put(encode_key(tmp_prefix, pos), encode_vlong(len)) - true - } + index.cursor_prefixed(queue_prefix_array) { (_, value) => + val record = QueuePB.FACTORY.parseFramed(value) + manager.store_queue(record) + true } - streams.using_message_stream { stream=> - index.cursor_prefixed(Array(tmp_prefix)) { (key, value) => - val (_, pos) = decode_long_key(key) - val len = decode_vlong(value).toInt - log.read(pos, len).foreach { value => - // Set the message key to be the position in the log. - val copy = MessagePB.FACTORY.parseUnframed(value).copy - copy.setMessageKey(pos) - val data = copy.freeze.toUnframedBuffer - - val check = MessagePB.FACTORY.parseUnframed(data).copy - assert(check.getMessageKey == pos) - - write_framed(stream, data) - } - true - } + index.cursor_prefixed(map_prefix_array, nocache) { (key, value) => + val key_buffer = new Buffer(key) + key_buffer.moveHead(1) + val record = new MapEntryPB.Bean + record.setKey(key_buffer) + record.setValue(new Buffer(value)) + manager.store_map_entry(record) + true } } @@ -1151,84 +1143,65 @@ class LevelDBClient(store: LevelDBStore) } } + manager.finish + None } catch { case x:Exception=> + debug(x, "Export failed") + x.printStackTrace() Some(x.getMessage) } } - def import_pb(streams:StreamManager[InputStream]):Option[String] = { + def import_data(is:InputStream):Option[String] = { try { - streams.using_version_stream { - stream => - var ver = read_text(stream).toInt - if (ver != 1) { - return Some("Cannot import from an export file at version: "+ver) - } + val manager = ImportStreamManager(is) + if(manager.version!=1) { + return Some("Cannot import from an export file of version: "+manager.version) } - } catch { - case e => return Some("Could not determine export format version: "+e) - } - try { purge - log_refs.clear() - - val actual_log_refs = HashMap[Long, LongCounter]() - - retry_using_index { - def foreach[Buffer] (stream:InputStream, fact:PBMessageFactory[_,_])(func: (Buffer)=>Unit):Unit = { - var done = false - do { - try { - func(fact.parseFramed(stream).asInstanceOf[Buffer]) - } catch { - case x:EOFException => - done = true - } - } while( !done ) - } + retry_using_index { log.appender { appender => - - streams.using_map_stream { stream=> - foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb => - index.put(encode_key(map_prefix, pb.getKey), pb.getValue.toByteArray) - } - } - - streams.using_queue_stream { stream=> - foreach[QueuePB.Buffer](stream, QueuePB.FACTORY) { record=> - index.put(encode_key(queue_prefix, record.getKey), record.toUnframedByteArray) - } - } + while(manager.getNext match { - streams.using_message_stream { stream=> - foreach[MessagePB.Buffer](stream, MessagePB.FACTORY) { record=> - val message_data = record.toUnframedBuffer + case record:MessagePB.Buffer => + val message_data = record.toFramedBuffer val pos = appender.append(LOG_ADD_MESSAGE, message_data) index.put(encode_key(tmp_prefix, record.getMessageKey), encode_locator(pos, message_data.length)) - } - } + true - streams.using_queue_entry_stream { stream=> - foreach[QueueEntryPB.Buffer](stream, QueueEntryPB.FACTORY) { record=> + case record:QueueEntryPB.Buffer => val copy = record.copy(); var original_msg_key: Long = record.getMessageKey index.get(encode_key(tmp_prefix, original_msg_key)) match { case Some(locator)=> val (pos, len) = decode_locator(locator) copy.setMessageLocator(locator) - index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), copy.freeze().toUnframedBuffer) + index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), copy.freeze().toFramedBuffer) log.log_info(pos).foreach { log_info => log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet() } case None => println("Invalid queue entry, references message that was not in the export: "+original_msg_key) } - } + true + + case record:QueuePB.Buffer => + index.put(encode_key(queue_prefix, record.getKey), record.toFramedBuffer) + true + + case record:MapEntryPB.Buffer => + index.put(encode_key(map_prefix, record.getKey), record.getValue) + true + + case null => + false + }) { // keep looping } + } } @@ -1244,6 +1217,7 @@ class LevelDBClient(store: LevelDBStore) } catch { case x:Exception=> + debug(x, "Import failed") Some(x.getMessage) } } Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala?rev=1234433&r1=1234432&r2=1234433&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala (original) +++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala Sat Jan 21 22:34:28 2012 @@ -290,16 +290,16 @@ class LevelDBStore(val config:LevelDBSto * Exports the contents of the store to the provided streams. Each stream should contain * a list of framed protobuf objects with the corresponding object types. */ - def export_pb(streams:StreamManager[OutputStream], cb:(Option[String])=>Unit) = write_executor { - cb(client.export_pb(streams)) + def export_data(os:OutputStream, cb:(Option[String])=>Unit) = write_executor { + cb(client.export_data(os)) } /** * Imports a previously exported set of streams. This deletes any previous data * in the store. */ - def import_pb(streams:StreamManager[InputStream], cb:(Option[String])=>Unit) = write_executor { - cb(client.import_pb(streams)) + def import_data(is:InputStream, cb:(Option[String])=>Unit) = write_executor { + cb(client.import_data(is)) } } Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarBuffer.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarBuffer.java?rev=1234433&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarBuffer.java (added) +++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarBuffer.java Sat Jan 21 22:34:28 2012 @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* + * This package is based on the work done by Timothy Gerard Endres + * (time@ice.com) to whom the Ant project is very grateful for his great code. + */ + +package org.apache.activemq.apollo.util.tar; + +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; +import java.util.Arrays; + +/** + * The TarBuffer class implements the tar archive concept + * of a buffered input stream. This concept goes back to the + * days of blocked tape drives and special io devices. In the + * Java universe, the only real function that this class + * performs is to ensure that files have the correct "block" + * size, or other tars will complain. + *

+ * You should never have a need to access this class directly. + * TarBuffers are created by Tar IO Streams. + * + */ + +public class TarBuffer { + + /** Default record size */ + public static final int DEFAULT_RCDSIZE = (512); + + /** Default block size */ + public static final int DEFAULT_BLKSIZE = (DEFAULT_RCDSIZE * 20); + + private InputStream inStream; + private OutputStream outStream; + private byte[] blockBuffer; + private int currBlkIdx; + private int currRecIdx; + private int blockSize; + private int recordSize; + private int recsPerBlock; + private boolean debug; + + /** + * Constructor for a TarBuffer on an input stream. + * @param inStream the input stream to use + */ + public TarBuffer(InputStream inStream) { + this(inStream, TarBuffer.DEFAULT_BLKSIZE); + } + + /** + * Constructor for a TarBuffer on an input stream. + * @param inStream the input stream to use + * @param blockSize the block size to use + */ + public TarBuffer(InputStream inStream, int blockSize) { + this(inStream, blockSize, TarBuffer.DEFAULT_RCDSIZE); + } + + /** + * Constructor for a TarBuffer on an input stream. + * @param inStream the input stream to use + * @param blockSize the block size to use + * @param recordSize the record size to use + */ + public TarBuffer(InputStream inStream, int blockSize, int recordSize) { + this.inStream = inStream; + this.outStream = null; + + this.initialize(blockSize, recordSize); + } + + /** + * Constructor for a TarBuffer on an output stream. + * @param outStream the output stream to use + */ + public TarBuffer(OutputStream outStream) { + this(outStream, TarBuffer.DEFAULT_BLKSIZE); + } + + /** + * Constructor for a TarBuffer on an output stream. + * @param outStream the output stream to use + * @param blockSize the block size to use + */ + public TarBuffer(OutputStream outStream, int blockSize) { + this(outStream, blockSize, TarBuffer.DEFAULT_RCDSIZE); + } + + /** + * Constructor for a TarBuffer on an output stream. + * @param outStream the output stream to use + * @param blockSize the block size to use + * @param recordSize the record size to use + */ + public TarBuffer(OutputStream outStream, int blockSize, int recordSize) { + this.inStream = null; + this.outStream = outStream; + + this.initialize(blockSize, recordSize); + } + + /** + * Initialization common to all constructors. + */ + private void initialize(int blockSize, int recordSize) { + this.debug = false; + this.blockSize = blockSize; + this.recordSize = recordSize; + this.recsPerBlock = (this.blockSize / this.recordSize); + this.blockBuffer = new byte[this.blockSize]; + + if (this.inStream != null) { + this.currBlkIdx = -1; + this.currRecIdx = this.recsPerBlock; + } else { + this.currBlkIdx = 0; + this.currRecIdx = 0; + } + } + + /** + * Get the TAR Buffer's block size. Blocks consist of multiple records. + * @return the block size + */ + public int getBlockSize() { + return this.blockSize; + } + + /** + * Get the TAR Buffer's record size. + * @return the record size + */ + public int getRecordSize() { + return this.recordSize; + } + + /** + * Set the debugging flag for the buffer. + * + * @param debug If true, print debugging output. + */ + public void setDebug(boolean debug) { + this.debug = debug; + } + + /** + * Determine if an archive record indicate End of Archive. End of + * archive is indicated by a record that consists entirely of null bytes. + * + * @param record The record data to check. + * @return true if the record data is an End of Archive + */ + public boolean isEOFRecord(byte[] record) { + for (int i = 0, sz = getRecordSize(); i < sz; ++i) { + if (record[i] != 0) { + return false; + } + } + + return true; + } + + /** + * Skip over a record on the input stream. + * @throws IOException on error + */ + public void skipRecord() throws IOException { + if (debug) { + System.err.println("SkipRecord: recIdx = " + currRecIdx + + " blkIdx = " + currBlkIdx); + } + + if (inStream == null) { + throw new IOException("reading (via skip) from an output buffer"); + } + + if (currRecIdx >= recsPerBlock) { + if (!readBlock()) { + return; // UNDONE + } + } + + currRecIdx++; + } + + /** + * Read a record from the input stream and return the data. + * + * @return The record data. + * @throws IOException on error + */ + public byte[] readRecord() throws IOException { + if (debug) { + System.err.println("ReadRecord: recIdx = " + currRecIdx + + " blkIdx = " + currBlkIdx); + } + + if (inStream == null) { + throw new IOException("reading from an output buffer"); + } + + if (currRecIdx >= recsPerBlock) { + if (!readBlock()) { + return null; + } + } + + byte[] result = new byte[recordSize]; + + System.arraycopy(blockBuffer, + (currRecIdx * recordSize), result, 0, + recordSize); + + currRecIdx++; + + return result; + } + + /** + * @return false if End-Of-File, else true + */ + private boolean readBlock() throws IOException { + if (debug) { + System.err.println("ReadBlock: blkIdx = " + currBlkIdx); + } + + if (inStream == null) { + throw new IOException("reading from an output buffer"); + } + + currRecIdx = 0; + + int offset = 0; + int bytesNeeded = blockSize; + + while (bytesNeeded > 0) { + long numBytes = inStream.read(blockBuffer, offset, + bytesNeeded); + + // + // NOTE + // We have fit EOF, and the block is not full! + // + // This is a broken archive. It does not follow the standard + // blocking algorithm. However, because we are generous, and + // it requires little effort, we will simply ignore the error + // and continue as if the entire block were read. This does + // not appear to break anything upstream. We used to return + // false in this case. + // + // Thanks to 'Yohann.Roussel@alcatel.fr' for this fix. + // + if (numBytes == -1) { + if (offset == 0) { + // Ensure that we do not read gigabytes of zeros + // for a corrupt tar file. + // See http://issues.apache.org/bugzilla/show_bug.cgi?id=39924 + return false; + } + // However, just leaving the unread portion of the buffer dirty does + // cause problems in some cases. This problem is described in + // http://issues.apache.org/bugzilla/show_bug.cgi?id=29877 + // + // The solution is to fill the unused portion of the buffer with zeros. + + Arrays.fill(blockBuffer, offset, offset + bytesNeeded, (byte) 0); + + break; + } + + offset += numBytes; + bytesNeeded -= numBytes; + + if (numBytes != blockSize) { + if (debug) { + System.err.println("ReadBlock: INCOMPLETE READ " + + numBytes + " of " + blockSize + + " bytes read."); + } + } + } + + currBlkIdx++; + + return true; + } + + /** + * Get the current block number, zero based. + * + * @return The current zero based block number. + */ + public int getCurrentBlockNum() { + return currBlkIdx; + } + + /** + * Get the current record number, within the current block, zero based. + * Thus, current offset = (currentBlockNum * recsPerBlk) + currentRecNum. + * + * @return The current zero based record number. + */ + public int getCurrentRecordNum() { + return currRecIdx - 1; + } + + /** + * Write an archive record to the archive. + * + * @param record The record data to write to the archive. + * @throws IOException on error + */ + public void writeRecord(byte[] record) throws IOException { + if (debug) { + System.err.println("WriteRecord: recIdx = " + currRecIdx + + " blkIdx = " + currBlkIdx); + } + + if (outStream == null) { + throw new IOException("writing to an input buffer"); + } + + if (record.length != recordSize) { + throw new IOException("record to write has length '" + + record.length + + "' which is not the record size of '" + + recordSize + "'"); + } + + if (currRecIdx >= recsPerBlock) { + writeBlock(); + } + + System.arraycopy(record, 0, blockBuffer, + (currRecIdx * recordSize), + recordSize); + + currRecIdx++; + } + + /** + * Write an archive record to the archive, where the record may be + * inside of a larger array buffer. The buffer must be "offset plus + * record size" long. + * + * @param buf The buffer containing the record data to write. + * @param offset The offset of the record data within buf. + * @throws IOException on error + */ + public void writeRecord(byte[] buf, int offset) throws IOException { + if (debug) { + System.err.println("WriteRecord: recIdx = " + currRecIdx + + " blkIdx = " + currBlkIdx); + } + + if (outStream == null) { + throw new IOException("writing to an input buffer"); + } + + if ((offset + recordSize) > buf.length) { + throw new IOException("record has length '" + buf.length + + "' with offset '" + offset + + "' which is less than the record size of '" + + recordSize + "'"); + } + + if (currRecIdx >= recsPerBlock) { + writeBlock(); + } + + System.arraycopy(buf, offset, blockBuffer, + (currRecIdx * recordSize), + recordSize); + + currRecIdx++; + } + + /** + * Write a TarBuffer block to the archive. + */ + private void writeBlock() throws IOException { + if (debug) { + System.err.println("WriteBlock: blkIdx = " + currBlkIdx); + } + + if (outStream == null) { + throw new IOException("writing to an input buffer"); + } + + outStream.write(blockBuffer, 0, blockSize); + outStream.flush(); + + currRecIdx = 0; + currBlkIdx++; + Arrays.fill(blockBuffer, (byte) 0); + } + + /** + * Flush the current data block if it has any data in it. + */ + void flushBlock() throws IOException { + if (debug) { + System.err.println("TarBuffer.flushBlock() called."); + } + + if (outStream == null) { + throw new IOException("writing to an input buffer"); + } + + if (currRecIdx > 0) { + writeBlock(); + } + } + + /** + * Close the TarBuffer. If this is an output buffer, also flush the + * current block before closing. + * @throws IOException on error + */ + public void close() throws IOException { + if (debug) { + System.err.println("TarBuffer.closeBuffer()."); + } + + if (outStream != null) { + flushBlock(); + + if (outStream != System.out + && outStream != System.err) { + outStream.close(); + + outStream = null; + } + } else if (inStream != null) { + if (inStream != System.in) { + inStream.close(); + + inStream = null; + } + } + } +} Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarConstants.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarConstants.java?rev=1234433&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarConstants.java (added) +++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarConstants.java Sat Jan 21 22:34:28 2012 @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* + * This package is based on the work done by Timothy Gerard Endres + * (time@ice.com) to whom the Ant project is very grateful for his great code. + */ + +package org.apache.activemq.apollo.util.tar; + +/** + * This interface contains all the definitions used in the package. + * + */ +// CheckStyle:InterfaceIsTypeCheck OFF (bc) +public interface TarConstants { + + /** + * The length of the name field in a header buffer. + */ + int NAMELEN = 100; + + /** + * The length of the mode field in a header buffer. + */ + int MODELEN = 8; + + /** + * The length of the user id field in a header buffer. + */ + int UIDLEN = 8; + + /** + * The length of the group id field in a header buffer. + */ + int GIDLEN = 8; + + /** + * The length of the checksum field in a header buffer. + */ + int CHKSUMLEN = 8; + + /** + * The length of the size field in a header buffer. + */ + int SIZELEN = 12; + + /** + * The maximum size of a file in a tar archive (That's 11 sevens, octal). + */ + long MAXSIZE = 077777777777L; + + /** + * The length of the magic field in a header buffer. + */ + int MAGICLEN = 8; + + /** + * The length of the modification time field in a header buffer. + */ + int MODTIMELEN = 12; + + /** + * The length of the user name field in a header buffer. + */ + int UNAMELEN = 32; + + /** + * The length of the group name field in a header buffer. + */ + int GNAMELEN = 32; + + /** + * The length of the devices field in a header buffer. + */ + int DEVLEN = 8; + + /** + * LF_ constants represent the "link flag" of an entry, or more commonly, + * the "entry type". This is the "old way" of indicating a normal file. + */ + byte LF_OLDNORM = 0; + + /** + * Normal file type. + */ + byte LF_NORMAL = (byte) '0'; + + /** + * Link file type. + */ + byte LF_LINK = (byte) '1'; + + /** + * Symbolic link file type. + */ + byte LF_SYMLINK = (byte) '2'; + + /** + * Character device file type. + */ + byte LF_CHR = (byte) '3'; + + /** + * Block device file type. + */ + byte LF_BLK = (byte) '4'; + + /** + * Directory file type. + */ + byte LF_DIR = (byte) '5'; + + /** + * FIFO (pipe) file type. + */ + byte LF_FIFO = (byte) '6'; + + /** + * Contiguous file type. + */ + byte LF_CONTIG = (byte) '7'; + + /** + * The magic tag representing a POSIX tar archive. + */ + String TMAGIC = "ustar"; + + /** + * The magic tag representing a GNU tar archive. + */ + String GNU_TMAGIC = "ustar "; + + /** + * The namr of the GNU tar entry which contains a long name. + */ + String GNU_LONGLINK = "././@LongLink"; + + /** + * Identifies the *next* file on the tape as having a long name. + */ + byte LF_GNUTYPE_LONGNAME = (byte) 'L'; +} Added: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarEntry.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarEntry.java?rev=1234433&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarEntry.java (added) +++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/tar/TarEntry.java Sat Jan 21 22:34:28 2012 @@ -0,0 +1,664 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* + * This package is based on the work done by Timothy Gerard Endres + * (time@ice.com) to whom the Ant project is very grateful for his great code. + */ + +package org.apache.activemq.apollo.util.tar; + +import java.io.File; +import java.util.Date; +import java.util.Locale; + +/** + * This class represents an entry in a Tar archive. It consists + * of the entry's header, as well as the entry's File. Entries + * can be instantiated in one of three ways, depending on how + * they are to be used. + *

+ * TarEntries that are created from the header bytes read from + * an archive are instantiated with the TarEntry( byte[] ) + * constructor. These entries will be used when extracting from + * or listing the contents of an archive. These entries have their + * header filled in using the header bytes. They also set the File + * to null, since they reference an archive entry not a file. + *

+ * TarEntries that are created from Files that are to be written + * into an archive are instantiated with the TarEntry( File ) + * constructor. These entries have their header filled in using + * the File's information. They also keep a reference to the File + * for convenience when writing entries. + *

+ * Finally, TarEntries can be constructed from nothing but a name. + * This allows the programmer to construct the entry by hand, for + * instance when only an InputStream is available for writing to + * the archive, and the header information is constructed from + * other information. In this case the header fields are set to + * defaults and the File is set to null. + * + *

+ * The C structure for a Tar Entry's header is: + *

+ * struct header {
+ * char name[NAMSIZ];
+ * char mode[8];
+ * char uid[8];
+ * char gid[8];
+ * char size[12];
+ * char mtime[12];
+ * char chksum[8];
+ * char linkflag;
+ * char linkname[NAMSIZ];
+ * char magic[8];
+ * char uname[TUNMLEN];
+ * char gname[TGNMLEN];
+ * char devmajor[8];
+ * char devminor[8];
+ * } header;
+ * 
+ * + */ + +public class TarEntry implements TarConstants { + /** The entry's name. */ + private StringBuffer name; + + /** The entry's permission mode. */ + private int mode; + + /** The entry's user id. */ + private int userId; + + /** The entry's group id. */ + private int groupId; + + /** The entry's size. */ + private long size; + + /** The entry's modification time. */ + private long modTime; + + /** The entry's link flag. */ + private byte linkFlag; + + /** The entry's link name. */ + private StringBuffer linkName; + + /** The entry's magic tag. */ + private StringBuffer magic; + + /** The entry's user name. */ + private StringBuffer userName; + + /** The entry's group name. */ + private StringBuffer groupName; + + /** The entry's major device number. */ + private int devMajor; + + /** The entry's minor device number. */ + private int devMinor; + + /** The entry's file reference */ + private File file; + + /** Maximum length of a user's name in the tar file */ + public static final int MAX_NAMELEN = 31; + + /** Default permissions bits for directories */ + public static final int DEFAULT_DIR_MODE = 040755; + + /** Default permissions bits for files */ + public static final int DEFAULT_FILE_MODE = 0100644; + + /** Convert millis to seconds */ + public static final int MILLIS_PER_SECOND = 1000; + + /** + * Construct an empty entry and prepares the header values. + */ + private TarEntry () { + this.magic = new StringBuffer(TMAGIC); + this.name = new StringBuffer(); + this.linkName = new StringBuffer(); + + String user = System.getProperty("user.name", ""); + + if (user.length() > MAX_NAMELEN) { + user = user.substring(0, MAX_NAMELEN); + } + + this.userId = 0; + this.groupId = 0; + this.userName = new StringBuffer(user); + this.groupName = new StringBuffer(""); + this.file = null; + } + + /** + * Construct an entry with only a name. This allows the programmer + * to construct the entry's header "by hand". File is set to null. + * + * @param name the entry name + */ + public TarEntry(String name) { + this(name, false); + } + + /** + * Construct an entry with only a name. This allows the programmer + * to construct the entry's header "by hand". File is set to null. + * + * @param name the entry name + * @param preserveLeadingSlashes whether to allow leading slashes + * in the name. + */ + public TarEntry(String name, boolean preserveLeadingSlashes) { + this(); + + name = normalizeFileName(name, preserveLeadingSlashes); + boolean isDir = name.endsWith("/"); + + this.devMajor = 0; + this.devMinor = 0; + this.name = new StringBuffer(name); + this.mode = isDir ? DEFAULT_DIR_MODE : DEFAULT_FILE_MODE; + this.linkFlag = isDir ? LF_DIR : LF_NORMAL; + this.userId = 0; + this.groupId = 0; + this.size = 0; + this.modTime = (new Date()).getTime() / MILLIS_PER_SECOND; + this.linkName = new StringBuffer(""); + this.userName = new StringBuffer(""); + this.groupName = new StringBuffer(""); + this.devMajor = 0; + this.devMinor = 0; + + } + + /** + * Construct an entry with a name and a link flag. + * + * @param name the entry name + * @param linkFlag the entry link flag. + */ + public TarEntry(String name, byte linkFlag) { + this(name); + this.linkFlag = linkFlag; + if (linkFlag == LF_GNUTYPE_LONGNAME) { + magic = new StringBuffer(GNU_TMAGIC); + } + } + + /** + * Construct an entry for a file. File is set to file, and the + * header is constructed from information from the file. + * + * @param file The file that the entry represents. + */ + public TarEntry(File file) { + this(); + + this.file = file; + + String fileName = normalizeFileName(file.getPath(), false); + this.linkName = new StringBuffer(""); + this.name = new StringBuffer(fileName); + + if (file.isDirectory()) { + this.mode = DEFAULT_DIR_MODE; + this.linkFlag = LF_DIR; + + int nameLength = name.length(); + if (nameLength == 0 || name.charAt(nameLength - 1) != '/') { + this.name.append("/"); + } + this.size = 0; + } else { + this.mode = DEFAULT_FILE_MODE; + this.linkFlag = LF_NORMAL; + this.size = file.length(); + } + + this.modTime = file.lastModified() / MILLIS_PER_SECOND; + this.devMajor = 0; + this.devMinor = 0; + } + + /** + * Construct an entry from an archive's header bytes. File is set + * to null. + * + * @param headerBuf The header bytes from a tar archive entry. + */ + public TarEntry(byte[] headerBuf) { + this(); + parseTarHeader(headerBuf); + } + + /** + * Determine if the two entries are equal. Equality is determined + * by the header names being equal. + * + * @param it Entry to be checked for equality. + * @return True if the entries are equal. + */ + public boolean equals(TarEntry it) { + return getName().equals(it.getName()); + } + + /** + * Determine if the two entries are equal. Equality is determined + * by the header names being equal. + * + * @param it Entry to be checked for equality. + * @return True if the entries are equal. + */ + public boolean equals(Object it) { + if (it == null || getClass() != it.getClass()) { + return false; + } + return equals((TarEntry) it); + } + + /** + * Hashcodes are based on entry names. + * + * @return the entry hashcode + */ + public int hashCode() { + return getName().hashCode(); + } + + /** + * Determine if the given entry is a descendant of this entry. + * Descendancy is determined by the name of the descendant + * starting with this entry's name. + * + * @param desc Entry to be checked as a descendent of this. + * @return True if entry is a descendant of this. + */ + public boolean isDescendent(TarEntry desc) { + return desc.getName().startsWith(getName()); + } + + /** + * Get this entry's name. + * + * @return This entry's name. + */ + public String getName() { + return name.toString(); + } + + /** + * Set this entry's name. + * + * @param name This entry's new name. + */ + public void setName(String name) { + this.name = new StringBuffer(normalizeFileName(name, false)); + } + + /** + * Set the mode for this entry + * + * @param mode the mode for this entry + */ + public void setMode(int mode) { + this.mode = mode; + } + + /** + * Get this entry's link name. + * + * @return This entry's link name. + */ + public String getLinkName() { + return linkName.toString(); + } + + /** + * Get this entry's user id. + * + * @return This entry's user id. + */ + public int getUserId() { + return userId; + } + + /** + * Set this entry's user id. + * + * @param userId This entry's new user id. + */ + public void setUserId(int userId) { + this.userId = userId; + } + + /** + * Get this entry's group id. + * + * @return This entry's group id. + */ + public int getGroupId() { + return groupId; + } + + /** + * Set this entry's group id. + * + * @param groupId This entry's new group id. + */ + public void setGroupId(int groupId) { + this.groupId = groupId; + } + + /** + * Get this entry's user name. + * + * @return This entry's user name. + */ + public String getUserName() { + return userName.toString(); + } + + /** + * Set this entry's user name. + * + * @param userName This entry's new user name. + */ + public void setUserName(String userName) { + this.userName = new StringBuffer(userName); + } + + /** + * Get this entry's group name. + * + * @return This entry's group name. + */ + public String getGroupName() { + return groupName.toString(); + } + + /** + * Set this entry's group name. + * + * @param groupName This entry's new group name. + */ + public void setGroupName(String groupName) { + this.groupName = new StringBuffer(groupName); + } + + /** + * Convenience method to set this entry's group and user ids. + * + * @param userId This entry's new user id. + * @param groupId This entry's new group id. + */ + public void setIds(int userId, int groupId) { + setUserId(userId); + setGroupId(groupId); + } + + /** + * Convenience method to set this entry's group and user names. + * + * @param userName This entry's new user name. + * @param groupName This entry's new group name. + */ + public void setNames(String userName, String groupName) { + setUserName(userName); + setGroupName(groupName); + } + + /** + * Set this entry's modification time. The parameter passed + * to this method is in "Java time". + * + * @param time This entry's new modification time. + */ + public void setModTime(long time) { + modTime = time / MILLIS_PER_SECOND; + } + + /** + * Set this entry's modification time. + * + * @param time This entry's new modification time. + */ + public void setModTime(Date time) { + modTime = time.getTime() / MILLIS_PER_SECOND; + } + + /** + * Set this entry's modification time. + * + * @return time This entry's new modification time. + */ + public Date getModTime() { + return new Date(modTime * MILLIS_PER_SECOND); + } + + /** + * Get this entry's file. + * + * @return This entry's file. + */ + public File getFile() { + return file; + } + + /** + * Get this entry's mode. + * + * @return This entry's mode. + */ + public int getMode() { + return mode; + } + + /** + * Get this entry's file size. + * + * @return This entry's file size. + */ + public long getSize() { + return size; + } + + /** + * Set this entry's file size. + * + * @param size This entry's new file size. + */ + public void setSize(long size) { + this.size = size; + } + + + /** + * Indicate if this entry is a GNU long name block + * + * @return true if this is a long name extension provided by GNU tar + */ + public boolean isGNULongNameEntry() { + return linkFlag == LF_GNUTYPE_LONGNAME + && name.toString().equals(GNU_LONGLINK); + } + + /** + * Return whether or not this entry represents a directory. + * + * @return True if this entry is a directory. + */ + public boolean isDirectory() { + if (file != null) { + return file.isDirectory(); + } + + if (linkFlag == LF_DIR) { + return true; + } + + if (getName().endsWith("/")) { + return true; + } + + return false; + } + + /** + * If this entry represents a file, and the file is a directory, return + * an array of TarEntries for this entry's children. + * + * @return An array of TarEntry's for this entry's children. + */ + public TarEntry[] getDirectoryEntries() { + if (file == null || !file.isDirectory()) { + return new TarEntry[0]; + } + + String[] list = file.list(); + TarEntry[] result = new TarEntry[list.length]; + + for (int i = 0; i < list.length; ++i) { + result[i] = new TarEntry(new File(file, list[i])); + } + + return result; + } + + /** + * Write an entry's header information to a header buffer. + * + * @param outbuf The tar entry header buffer to fill in. + */ + public void writeEntryHeader(byte[] outbuf) { + int offset = 0; + + offset = TarUtils.getNameBytes(name, outbuf, offset, NAMELEN); + offset = TarUtils.getOctalBytes(mode, outbuf, offset, MODELEN); + offset = TarUtils.getOctalBytes(userId, outbuf, offset, UIDLEN); + offset = TarUtils.getOctalBytes(groupId, outbuf, offset, GIDLEN); + offset = TarUtils.getLongOctalBytes(size, outbuf, offset, SIZELEN); + offset = TarUtils.getLongOctalBytes(modTime, outbuf, offset, MODTIMELEN); + + int csOffset = offset; + + for (int c = 0; c < CHKSUMLEN; ++c) { + outbuf[offset++] = (byte) ' '; + } + + outbuf[offset++] = linkFlag; + offset = TarUtils.getNameBytes(linkName, outbuf, offset, NAMELEN); + offset = TarUtils.getNameBytes(magic, outbuf, offset, MAGICLEN); + offset = TarUtils.getNameBytes(userName, outbuf, offset, UNAMELEN); + offset = TarUtils.getNameBytes(groupName, outbuf, offset, GNAMELEN); + offset = TarUtils.getOctalBytes(devMajor, outbuf, offset, DEVLEN); + offset = TarUtils.getOctalBytes(devMinor, outbuf, offset, DEVLEN); + + while (offset < outbuf.length) { + outbuf[offset++] = 0; + } + + long chk = TarUtils.computeCheckSum(outbuf); + + TarUtils.getCheckSumOctalBytes(chk, outbuf, csOffset, CHKSUMLEN); + } + + /** + * Parse an entry's header information from a header buffer. + * + * @param header The tar entry header buffer to get information from. + */ + public void parseTarHeader(byte[] header) { + int offset = 0; + + name = TarUtils.parseName(header, offset, NAMELEN); + offset += NAMELEN; + mode = (int) TarUtils.parseOctal(header, offset, MODELEN); + offset += MODELEN; + userId = (int) TarUtils.parseOctal(header, offset, UIDLEN); + offset += UIDLEN; + groupId = (int) TarUtils.parseOctal(header, offset, GIDLEN); + offset += GIDLEN; + size = TarUtils.parseOctal(header, offset, SIZELEN); + offset += SIZELEN; + modTime = TarUtils.parseOctal(header, offset, MODTIMELEN); + offset += MODTIMELEN; + offset += CHKSUMLEN; + linkFlag = header[offset++]; + linkName = TarUtils.parseName(header, offset, NAMELEN); + offset += NAMELEN; + magic = TarUtils.parseName(header, offset, MAGICLEN); + offset += MAGICLEN; + userName = TarUtils.parseName(header, offset, UNAMELEN); + offset += UNAMELEN; + groupName = TarUtils.parseName(header, offset, GNAMELEN); + offset += GNAMELEN; + devMajor = (int) TarUtils.parseOctal(header, offset, DEVLEN); + offset += DEVLEN; + devMinor = (int) TarUtils.parseOctal(header, offset, DEVLEN); + } + + /** + * Strips Windows' drive letter as well as any leading slashes, + * turns path separators into forward slahes. + */ + private static String normalizeFileName(String fileName, + boolean preserveLeadingSlashes) { + String osname = System.getProperty("os.name").toLowerCase(Locale.ENGLISH); + + if (osname != null) { + + // Strip off drive letters! + // REVIEW Would a better check be "(File.separator == '\')"? + + if (osname.startsWith("windows")) { + if (fileName.length() > 2) { + char ch1 = fileName.charAt(0); + char ch2 = fileName.charAt(1); + + if (ch2 == ':' + && ((ch1 >= 'a' && ch1 <= 'z') + || (ch1 >= 'A' && ch1 <= 'Z'))) { + fileName = fileName.substring(2); + } + } + } else if (osname.indexOf("netware") > -1) { + int colon = fileName.indexOf(':'); + if (colon != -1) { + fileName = fileName.substring(colon + 1); + } + } + } + + fileName = fileName.replace(File.separatorChar, '/'); + + // No absolute pathnames + // Windows (and Posix?) paths can start with "\\NetworkDrive\", + // so we loop on starting /'s. + while (!preserveLeadingSlashes && fileName.startsWith("/")) { + fileName = fileName.substring(1); + } + return fileName; + } +}