Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6930997F6 for ; Fri, 26 Oct 2012 20:29:38 +0000 (UTC) Received: (qmail 49775 invoked by uid 500); 26 Oct 2012 20:29:38 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 49741 invoked by uid 500); 26 Oct 2012 20:29:38 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 49733 invoked by uid 99); 26 Oct 2012 20:29:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Oct 2012 20:29:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Oct 2012 20:29:34 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 31FA22388AA6 for ; Fri, 26 Oct 2012 20:28:51 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1402652 [1/2] - in /activemq/trunk/activemq-console: ./ src/main/java/org/apache/activemq/console/command/ src/main/java/org/apache/activemq/console/command/store/ src/main/java/org/apache/activemq/console/command/store/tar/ src/main/proto/ Date: Fri, 26 Oct 2012 20:28:50 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121026202851.31FA22388AA6@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Fri Oct 26 20:28:49 2012 New Revision: 1402652 URL: http://svn.apache.org/viewvc?rev=1402652&view=rev Log: Implementing AMQ-4137 : Create a store import/export command line tool to covert between store types Usage is a simple as "bin/activemq export --file=archive.tgz". All data is stored in the tgz in the format defined by the Apollo store export/import utilities. Only export implemented at this time. I have verified that Apollo can import queues exported from ActiveMQ. Durable subs probably need a little more work. Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/ExportStreamManager.java activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarBuffer.java activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarConstants.java activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarEntry.java activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarInputStream.java activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarOutputStream.java activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarUtils.java activemq/trunk/activemq-console/src/main/proto/ activemq/trunk/activemq-console/src/main/proto/data.proto Modified: activemq/trunk/activemq-console/pom.xml activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java Modified: activemq/trunk/activemq-console/pom.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/pom.xml?rev=1402652&r1=1402651&r2=1402652&view=diff ============================================================================== --- activemq/trunk/activemq-console/pom.xml (original) +++ activemq/trunk/activemq-console/pom.xml Fri Oct 26 20:28:49 2012 @@ -77,8 +77,28 @@ xbean-spring + + org.fusesource.hawtbuf + hawtbuf-proto + ${hawtbuf-version} + + + org.slf4j + slf4j-api + + + org.codehaus.jackson + jackson-core-asl + ${jackson-version} + + + org.codehaus.jackson + jackson-mapper-asl + ${jackson-version} + - + + org.springframework spring-context @@ -173,6 +193,22 @@ + + + org.fusesource.hawtbuf + hawtbuf-protoc + ${hawtbuf-version} + + alt + + + + + compile + + + + Modified: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java?rev=1402652&r1=1402651&r2=1402652&view=diff ============================================================================== --- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java (original) +++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/ShellCommand.java Fri Oct 26 20:28:49 2012 @@ -48,6 +48,7 @@ public class ShellCommand extends Abstra " query - Display selected broker component's attributes and statistics.", " browse - Display selected messages in a specified destination.", " journal-audit - Allows you to view records stored in the persistent journal.", + " export - Exports a stopped brokers data files to an archive file", " purge - Delete selected destination's messages that matches the message selector", " encrypt - Encrypts given text", " decrypt - Decrypts given text", @@ -137,6 +138,8 @@ public class ShellCommand extends Abstra command = new EncryptCommand(); } else if (taskToken.equals("decrypt")) { command = new DecryptCommand(); + } else if (taskToken.equals("export")) { + command = new StoreExportCommand(); } else if (taskToken.equals("help")) { printHelp(); } else { Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java?rev=1402652&view=auto ============================================================================== --- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java (added) +++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/StoreExportCommand.java Fri Oct 26 20:28:49 2012 @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console.command; + +import org.apache.activemq.console.CommandContext; +import org.apache.activemq.console.command.store.StoreExporter; +import org.apache.activemq.console.command.store.amq.CommandLineSupport; + +import java.util.Arrays; +import java.util.List; + +/** + * @author Hiram Chirino + */ +public class StoreExportCommand implements Command { + + private CommandContext context; + + @Override + public void setCommandContext(CommandContext context) { + this.context = context; + } + + @Override + public void execute(List tokens) throws Exception { + StoreExporter exporter = new StoreExporter(); + String[] remaining = CommandLineSupport.setOptions(exporter, tokens.toArray(new String[tokens.size()])); + if (remaining.length > 0) { + throw new Exception("Unexpected arguments: " + Arrays.asList(remaining)); + } + exporter.execute(); + } +} Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/ExportStreamManager.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/ExportStreamManager.java?rev=1402652&view=auto ============================================================================== --- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/ExportStreamManager.java (added) +++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/ExportStreamManager.java Fri Oct 26 20:28:49 2012 @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console.command.store; + +import org.apache.activemq.console.command.store.proto.*; +import org.apache.activemq.console.command.store.tar.TarEntry; +import org.apache.activemq.console.command.store.tar.TarOutputStream; +import org.fusesource.hawtbuf.AsciiBuffer; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.proto.MessageBuffer; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.zip.GZIPOutputStream; + +/** + * @author Hiram Chirino + */ +public class ExportStreamManager { + + private final OutputStream target; + private final int version; + TarOutputStream stream; + + ExportStreamManager(OutputStream target, int version) throws IOException { + this.target = target; + this.version = version; + stream = new TarOutputStream(new GZIPOutputStream(target)); + store("ver", new AsciiBuffer(""+version)); + } + + + long seq = 0; + + public void finish() throws IOException { + stream.close(); + } + + private void store(String ext, Buffer value) throws IOException { + TarEntry entry = new TarEntry(seq + "." + ext); + seq += 1; + entry.setSize(value.length()); + stream.putNextEntry(entry); + value.writeTo(stream); + stream.closeEntry(); + } + + private void store(String ext, MessageBuffer value) throws IOException { + TarEntry entry = new TarEntry(seq + "." + ext); + seq += 1; + entry.setSize(value.serializedSizeFramed()); + stream.putNextEntry(entry); + value.writeFramed(stream); + stream.closeEntry(); + } + + + public void store_queue(QueuePB.Getter value) throws IOException { + store("que", value.freeze()); + } + public void store_queue_entry(QueueEntryPB.Getter value) throws IOException { + store("qen", value.freeze()); + } + public void store_message(MessagePB.Getter value) throws IOException { + store("msg", value.freeze()); + } + public void store_map_entry(MapEntryPB.Getter value) throws IOException { + store("map", value.freeze()); + } + +} Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java?rev=1402652&view=auto ============================================================================== --- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java (added) +++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java Fri Oct 26 20:28:49 2012 @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.console.command.store; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.*; +import org.apache.activemq.console.command.store.proto.MessagePB; +import org.apache.activemq.console.command.store.proto.QueueEntryPB; +import org.apache.activemq.console.command.store.proto.QueuePB; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.store.*; +import org.apache.activemq.xbean.XBeanBrokerFactory; +import org.codehaus.jackson.map.ObjectMapper; +import org.fusesource.hawtbuf.AsciiBuffer; +import org.fusesource.hawtbuf.DataByteArrayOutputStream; +import org.fusesource.hawtbuf.UTF8Buffer; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; + +/** + * @author Hiram Chirino + */ +public class StoreExporter { + + URI config; + File file; + + public StoreExporter() throws URISyntaxException { + config = new URI("xbean:activemq.xml"); + } + + public void execute() throws Exception { + if (config == null) { + throw new Exception("required --config option missing"); + } + if (file == null) { + throw new Exception("required --file option missing"); + } + System.out.println("Loading: " + config); + XBeanBrokerFactory.setStartDefault(false); // to avoid the broker auto-starting.. + BrokerService broker = BrokerFactory.createBroker(config); + XBeanBrokerFactory.resetStartDefault(); + PersistenceAdapter store = broker.getPersistenceAdapter(); + System.out.println("Starting: " + store); + store.start(); + try { + BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(file)); + try { + export(store, fos); + } finally { + fos.close(); + } + } finally { + store.stop(); + } + } + + static final int OPENWIRE_VERSION = 8; + static final boolean TIGHT_ENCODING = false; + + void export(PersistenceAdapter store, BufferedOutputStream fos) throws Exception { + + ObjectMapper mapper = new ObjectMapper(); + final AsciiBuffer ds_kind = new AsciiBuffer("ds"); + final AsciiBuffer ptp_kind = new AsciiBuffer("ptp"); + final AsciiBuffer codec_id = new AsciiBuffer("openwire"); + final OpenWireFormat wireformat = new OpenWireFormat(); + wireformat.setCacheEnabled(false); + wireformat.setTightEncodingEnabled(TIGHT_ENCODING); + wireformat.setVersion(OPENWIRE_VERSION); + + final long[] messageKeyCounter = new long[]{0}; + final long[] containerKeyCounter = new long[]{0}; + final ExportStreamManager manager = new ExportStreamManager(fos, 1); + + + final int[] preparedTxs = new int[]{0}; + store.createTransactionStore().recover(new TransactionRecoveryListener() { + public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) { + preparedTxs[0] += 1; + } + }); + + if (preparedTxs[0] > 0) { + throw new Exception("Cannot export a store with prepared XA transactions. Please commit or rollback those transactions before attempting to export."); + } + + for (ActiveMQDestination odest : store.getDestinations()) { + containerKeyCounter[0]++; + if (odest instanceof ActiveMQQueue) { + ActiveMQQueue dest = (ActiveMQQueue) odest; + MessageStore queue = store.createQueueMessageStore(dest); + + QueuePB.Bean destRecord = new QueuePB.Bean(); + destRecord.setKey(containerKeyCounter[0]); + destRecord.setBindingKind(ptp_kind); + + final long[] seqKeyCounter = new long[]{0}; + + HashMap jsonMap = new HashMap(); + jsonMap.put("@class", "queue_destination"); + jsonMap.put("name", dest.getQueueName()); + String json = mapper.writeValueAsString(jsonMap); + System.out.println(json); + destRecord.setBindingData(new UTF8Buffer(json)); + manager.store_queue(destRecord); + + queue.recover(new MessageRecoveryListener() { + public boolean hasSpace() { + return true; + } + + public boolean recoverMessageReference(MessageId ref) throws Exception { + return true; + } + + public boolean isDuplicate(MessageId ref) { + return false; + } + + public boolean recoverMessage(Message message) throws IOException { + messageKeyCounter[0]++; + seqKeyCounter[0]++; + + DataByteArrayOutputStream mos = new DataByteArrayOutputStream(); + mos.writeBoolean(TIGHT_ENCODING); + mos.writeVarInt(OPENWIRE_VERSION); + wireformat.marshal(message, mos); + + MessagePB.Bean messageRecord = new MessagePB.Bean(); + messageRecord.setCodec(codec_id); + messageRecord.setMessageKey(messageKeyCounter[0]); + messageRecord.setSize(message.getSize()); + messageRecord.setValue(mos.toBuffer()); + // record.setCompression() + manager.store_message(messageRecord); + + QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean(); + entryRecord.setQueueKey(containerKeyCounter[0]); + entryRecord.setQueueSeq(seqKeyCounter[0]); + entryRecord.setMessageKey(messageKeyCounter[0]); + entryRecord.setSize(message.getSize()); + if (message.getExpiration() != 0) { + entryRecord.setExpiration(message.getExpiration()); + } + if (message.getRedeliveryCounter() != 0) { + entryRecord.setRedeliveries(message.getRedeliveryCounter()); + } + manager.store_queue_entry(entryRecord); + return true; + } + }); + + } else if (odest instanceof ActiveMQTopic) { + ActiveMQTopic dest = (ActiveMQTopic) odest; + + TopicMessageStore topic = store.createTopicMessageStore(dest); + for (SubscriptionInfo sub : topic.getAllSubscriptions()) { + + QueuePB.Bean destRecord = new QueuePB.Bean(); + destRecord.setKey(containerKeyCounter[0]); + destRecord.setBindingKind(ds_kind); + + // TODO: use a real JSON encoder like jackson. + HashMap jsonMap = new HashMap(); + jsonMap.put("@class", "dsub_destination"); + jsonMap.put("name", sub.getClientId() + ":" + sub.getSubcriptionName()); + HashMap jsonTopic = new HashMap(); + jsonTopic.put("name", dest.getTopicName()); + jsonMap.put("topics", new Object[]{jsonTopic}); + if (sub.getSelector() != null) { + jsonMap.put("selector", sub.getSelector()); + } + String json = mapper.writeValueAsString(jsonMap); + System.out.println(json); + + destRecord.setBindingData(new UTF8Buffer(json)); + manager.store_queue(destRecord); + + final long seqKeyCounter[] = new long[]{0}; + topic.recoverSubscription(sub.getClientId(), sub.getSubcriptionName(), new MessageRecoveryListener() { + public boolean hasSpace() { + return true; + } + + public boolean recoverMessageReference(MessageId ref) throws Exception { + return true; + } + + public boolean isDuplicate(MessageId ref) { + return false; + } + + public boolean recoverMessage(Message message) throws IOException { + messageKeyCounter[0]++; + seqKeyCounter[0]++; + + DataByteArrayOutputStream mos = new DataByteArrayOutputStream(); + mos.writeBoolean(TIGHT_ENCODING); + mos.writeVarInt(OPENWIRE_VERSION); + wireformat.marshal(mos); + + MessagePB.Bean messageRecord = new MessagePB.Bean(); + messageRecord.setCodec(codec_id); + messageRecord.setMessageKey(messageKeyCounter[0]); + messageRecord.setSize(message.getSize()); + messageRecord.setValue(mos.toBuffer()); + // record.setCompression() + manager.store_message(messageRecord); + + QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean(); + entryRecord.setQueueKey(containerKeyCounter[0]); + entryRecord.setQueueSeq(seqKeyCounter[0]); + entryRecord.setMessageKey(messageKeyCounter[0]); + entryRecord.setSize(message.getSize()); + if (message.getExpiration() != 0) { + entryRecord.setExpiration(message.getExpiration()); + } + if (message.getRedeliveryCounter() != 0) { + entryRecord.setRedeliveries(message.getRedeliveryCounter()); + } + manager.store_queue_entry(entryRecord); + return true; + } + }); + + } + } + } + manager.finish(); + } + + public File getFile() { + return file; + } + + public void setFile(String file) { + setFile(new File(file)); + } + + public void setFile(File file) { + this.file = file; + } + + public URI getConfig() { + return config; + } + + public void setConfig(URI config) { + this.config = config; + } +} Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarBuffer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarBuffer.java?rev=1402652&view=auto ============================================================================== --- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarBuffer.java (added) +++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarBuffer.java Fri Oct 26 20:28:49 2012 @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* + * This package is based on the work done by Timothy Gerard Endres + * (time@ice.com) to whom the Ant project is very grateful for his great code. + */ + +package org.apache.activemq.console.command.store.tar; + +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; +import java.util.Arrays; + +/** + * The TarBuffer class implements the tar archive concept + * of a buffered input stream. This concept goes back to the + * days of blocked tape drives and special io devices. In the + * Java universe, the only real function that this class + * performs is to ensure that files have the correct "block" + * size, or other tars will complain. + *

+ * You should never have a need to access this class directly. + * TarBuffers are created by Tar IO Streams. + * + */ + +public class TarBuffer { + + /** Default record size */ + public static final int DEFAULT_RCDSIZE = (512); + + /** Default block size */ + public static final int DEFAULT_BLKSIZE = (DEFAULT_RCDSIZE * 20); + + private InputStream inStream; + private OutputStream outStream; + private byte[] blockBuffer; + private int currBlkIdx; + private int currRecIdx; + private int blockSize; + private int recordSize; + private int recsPerBlock; + private boolean debug; + + /** + * Constructor for a TarBuffer on an input stream. + * @param inStream the input stream to use + */ + public TarBuffer(InputStream inStream) { + this(inStream, TarBuffer.DEFAULT_BLKSIZE); + } + + /** + * Constructor for a TarBuffer on an input stream. + * @param inStream the input stream to use + * @param blockSize the block size to use + */ + public TarBuffer(InputStream inStream, int blockSize) { + this(inStream, blockSize, TarBuffer.DEFAULT_RCDSIZE); + } + + /** + * Constructor for a TarBuffer on an input stream. + * @param inStream the input stream to use + * @param blockSize the block size to use + * @param recordSize the record size to use + */ + public TarBuffer(InputStream inStream, int blockSize, int recordSize) { + this.inStream = inStream; + this.outStream = null; + + this.initialize(blockSize, recordSize); + } + + /** + * Constructor for a TarBuffer on an output stream. + * @param outStream the output stream to use + */ + public TarBuffer(OutputStream outStream) { + this(outStream, TarBuffer.DEFAULT_BLKSIZE); + } + + /** + * Constructor for a TarBuffer on an output stream. + * @param outStream the output stream to use + * @param blockSize the block size to use + */ + public TarBuffer(OutputStream outStream, int blockSize) { + this(outStream, blockSize, TarBuffer.DEFAULT_RCDSIZE); + } + + /** + * Constructor for a TarBuffer on an output stream. + * @param outStream the output stream to use + * @param blockSize the block size to use + * @param recordSize the record size to use + */ + public TarBuffer(OutputStream outStream, int blockSize, int recordSize) { + this.inStream = null; + this.outStream = outStream; + + this.initialize(blockSize, recordSize); + } + + /** + * Initialization common to all constructors. + */ + private void initialize(int blockSize, int recordSize) { + this.debug = false; + this.blockSize = blockSize; + this.recordSize = recordSize; + this.recsPerBlock = (this.blockSize / this.recordSize); + this.blockBuffer = new byte[this.blockSize]; + + if (this.inStream != null) { + this.currBlkIdx = -1; + this.currRecIdx = this.recsPerBlock; + } else { + this.currBlkIdx = 0; + this.currRecIdx = 0; + } + } + + /** + * Get the TAR Buffer's block size. Blocks consist of multiple records. + * @return the block size + */ + public int getBlockSize() { + return this.blockSize; + } + + /** + * Get the TAR Buffer's record size. + * @return the record size + */ + public int getRecordSize() { + return this.recordSize; + } + + /** + * Set the debugging flag for the buffer. + * + * @param debug If true, print debugging output. + */ + public void setDebug(boolean debug) { + this.debug = debug; + } + + /** + * Determine if an archive record indicate End of Archive. End of + * archive is indicated by a record that consists entirely of null bytes. + * + * @param record The record data to check. + * @return true if the record data is an End of Archive + */ + public boolean isEOFRecord(byte[] record) { + for (int i = 0, sz = getRecordSize(); i < sz; ++i) { + if (record[i] != 0) { + return false; + } + } + + return true; + } + + /** + * Skip over a record on the input stream. + * @throws IOException on error + */ + public void skipRecord() throws IOException { + if (debug) { + System.err.println("SkipRecord: recIdx = " + currRecIdx + + " blkIdx = " + currBlkIdx); + } + + if (inStream == null) { + throw new IOException("reading (via skip) from an output buffer"); + } + + if (currRecIdx >= recsPerBlock) { + if (!readBlock()) { + return; // UNDONE + } + } + + currRecIdx++; + } + + /** + * Read a record from the input stream and return the data. + * + * @return The record data. + * @throws IOException on error + */ + public byte[] readRecord() throws IOException { + if (debug) { + System.err.println("ReadRecord: recIdx = " + currRecIdx + + " blkIdx = " + currBlkIdx); + } + + if (inStream == null) { + throw new IOException("reading from an output buffer"); + } + + if (currRecIdx >= recsPerBlock) { + if (!readBlock()) { + return null; + } + } + + byte[] result = new byte[recordSize]; + + System.arraycopy(blockBuffer, + (currRecIdx * recordSize), result, 0, + recordSize); + + currRecIdx++; + + return result; + } + + /** + * @return false if End-Of-File, else true + */ + private boolean readBlock() throws IOException { + if (debug) { + System.err.println("ReadBlock: blkIdx = " + currBlkIdx); + } + + if (inStream == null) { + throw new IOException("reading from an output buffer"); + } + + currRecIdx = 0; + + int offset = 0; + int bytesNeeded = blockSize; + + while (bytesNeeded > 0) { + long numBytes = inStream.read(blockBuffer, offset, + bytesNeeded); + + // + // NOTE + // We have fit EOF, and the block is not full! + // + // This is a broken archive. It does not follow the standard + // blocking algorithm. However, because we are generous, and + // it requires little effort, we will simply ignore the error + // and continue as if the entire block were read. This does + // not appear to break anything upstream. We used to return + // false in this case. + // + // Thanks to 'Yohann.Roussel@alcatel.fr' for this fix. + // + if (numBytes == -1) { + if (offset == 0) { + // Ensure that we do not read gigabytes of zeros + // for a corrupt tar file. + // See http://issues.apache.org/bugzilla/show_bug.cgi?id=39924 + return false; + } + // However, just leaving the unread portion of the buffer dirty does + // cause problems in some cases. This problem is described in + // http://issues.apache.org/bugzilla/show_bug.cgi?id=29877 + // + // The solution is to fill the unused portion of the buffer with zeros. + + Arrays.fill(blockBuffer, offset, offset + bytesNeeded, (byte) 0); + + break; + } + + offset += numBytes; + bytesNeeded -= numBytes; + + if (numBytes != blockSize) { + if (debug) { + System.err.println("ReadBlock: INCOMPLETE READ " + + numBytes + " of " + blockSize + + " bytes read."); + } + } + } + + currBlkIdx++; + + return true; + } + + /** + * Get the current block number, zero based. + * + * @return The current zero based block number. + */ + public int getCurrentBlockNum() { + return currBlkIdx; + } + + /** + * Get the current record number, within the current block, zero based. + * Thus, current offset = (currentBlockNum * recsPerBlk) + currentRecNum. + * + * @return The current zero based record number. + */ + public int getCurrentRecordNum() { + return currRecIdx - 1; + } + + /** + * Write an archive record to the archive. + * + * @param record The record data to write to the archive. + * @throws IOException on error + */ + public void writeRecord(byte[] record) throws IOException { + if (debug) { + System.err.println("WriteRecord: recIdx = " + currRecIdx + + " blkIdx = " + currBlkIdx); + } + + if (outStream == null) { + throw new IOException("writing to an input buffer"); + } + + if (record.length != recordSize) { + throw new IOException("record to write has length '" + + record.length + + "' which is not the record size of '" + + recordSize + "'"); + } + + if (currRecIdx >= recsPerBlock) { + writeBlock(); + } + + System.arraycopy(record, 0, blockBuffer, + (currRecIdx * recordSize), + recordSize); + + currRecIdx++; + } + + /** + * Write an archive record to the archive, where the record may be + * inside of a larger array buffer. The buffer must be "offset plus + * record size" long. + * + * @param buf The buffer containing the record data to write. + * @param offset The offset of the record data within buf. + * @throws IOException on error + */ + public void writeRecord(byte[] buf, int offset) throws IOException { + if (debug) { + System.err.println("WriteRecord: recIdx = " + currRecIdx + + " blkIdx = " + currBlkIdx); + } + + if (outStream == null) { + throw new IOException("writing to an input buffer"); + } + + if ((offset + recordSize) > buf.length) { + throw new IOException("record has length '" + buf.length + + "' with offset '" + offset + + "' which is less than the record size of '" + + recordSize + "'"); + } + + if (currRecIdx >= recsPerBlock) { + writeBlock(); + } + + System.arraycopy(buf, offset, blockBuffer, + (currRecIdx * recordSize), + recordSize); + + currRecIdx++; + } + + /** + * Write a TarBuffer block to the archive. + */ + private void writeBlock() throws IOException { + if (debug) { + System.err.println("WriteBlock: blkIdx = " + currBlkIdx); + } + + if (outStream == null) { + throw new IOException("writing to an input buffer"); + } + + outStream.write(blockBuffer, 0, blockSize); + outStream.flush(); + + currRecIdx = 0; + currBlkIdx++; + Arrays.fill(blockBuffer, (byte) 0); + } + + /** + * Flush the current data block if it has any data in it. + */ + void flushBlock() throws IOException { + if (debug) { + System.err.println("TarBuffer.flushBlock() called."); + } + + if (outStream == null) { + throw new IOException("writing to an input buffer"); + } + + if (currRecIdx > 0) { + writeBlock(); + } + } + + /** + * Close the TarBuffer. If this is an output buffer, also flush the + * current block before closing. + * @throws IOException on error + */ + public void close() throws IOException { + if (debug) { + System.err.println("TarBuffer.closeBuffer()."); + } + + if (outStream != null) { + flushBlock(); + + if (outStream != System.out + && outStream != System.err) { + outStream.close(); + + outStream = null; + } + } else if (inStream != null) { + if (inStream != System.in) { + inStream.close(); + + inStream = null; + } + } + } +} Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarConstants.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarConstants.java?rev=1402652&view=auto ============================================================================== --- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarConstants.java (added) +++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarConstants.java Fri Oct 26 20:28:49 2012 @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* + * This package is based on the work done by Timothy Gerard Endres + * (time@ice.com) to whom the Ant project is very grateful for his great code. + */ + +package org.apache.activemq.console.command.store.tar; + +/** + * This interface contains all the definitions used in the package. + * + */ +// CheckStyle:InterfaceIsTypeCheck OFF (bc) +public interface TarConstants { + + /** + * The length of the name field in a header buffer. + */ + int NAMELEN = 100; + + /** + * The length of the mode field in a header buffer. + */ + int MODELEN = 8; + + /** + * The length of the user id field in a header buffer. + */ + int UIDLEN = 8; + + /** + * The length of the group id field in a header buffer. + */ + int GIDLEN = 8; + + /** + * The length of the checksum field in a header buffer. + */ + int CHKSUMLEN = 8; + + /** + * The length of the size field in a header buffer. + */ + int SIZELEN = 12; + + /** + * The maximum size of a file in a tar archive (That's 11 sevens, octal). + */ + long MAXSIZE = 077777777777L; + + /** + * The length of the magic field in a header buffer. + */ + int MAGICLEN = 8; + + /** + * The length of the modification time field in a header buffer. + */ + int MODTIMELEN = 12; + + /** + * The length of the user name field in a header buffer. + */ + int UNAMELEN = 32; + + /** + * The length of the group name field in a header buffer. + */ + int GNAMELEN = 32; + + /** + * The length of the devices field in a header buffer. + */ + int DEVLEN = 8; + + /** + * LF_ constants represent the "link flag" of an entry, or more commonly, + * the "entry type". This is the "old way" of indicating a normal file. + */ + byte LF_OLDNORM = 0; + + /** + * Normal file type. + */ + byte LF_NORMAL = (byte) '0'; + + /** + * Link file type. + */ + byte LF_LINK = (byte) '1'; + + /** + * Symbolic link file type. + */ + byte LF_SYMLINK = (byte) '2'; + + /** + * Character device file type. + */ + byte LF_CHR = (byte) '3'; + + /** + * Block device file type. + */ + byte LF_BLK = (byte) '4'; + + /** + * Directory file type. + */ + byte LF_DIR = (byte) '5'; + + /** + * FIFO (pipe) file type. + */ + byte LF_FIFO = (byte) '6'; + + /** + * Contiguous file type. + */ + byte LF_CONTIG = (byte) '7'; + + /** + * The magic tag representing a POSIX tar archive. + */ + String TMAGIC = "ustar"; + + /** + * The magic tag representing a GNU tar archive. + */ + String GNU_TMAGIC = "ustar "; + + /** + * The namr of the GNU tar entry which contains a long name. + */ + String GNU_LONGLINK = "././@LongLink"; + + /** + * Identifies the *next* file on the tape as having a long name. + */ + byte LF_GNUTYPE_LONGNAME = (byte) 'L'; +} Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarEntry.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarEntry.java?rev=1402652&view=auto ============================================================================== --- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarEntry.java (added) +++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarEntry.java Fri Oct 26 20:28:49 2012 @@ -0,0 +1,664 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* + * This package is based on the work done by Timothy Gerard Endres + * (time@ice.com) to whom the Ant project is very grateful for his great code. + */ + +package org.apache.activemq.console.command.store.tar; + +import java.io.File; +import java.util.Date; +import java.util.Locale; + +/** + * This class represents an entry in a Tar archive. It consists + * of the entry's header, as well as the entry's File. Entries + * can be instantiated in one of three ways, depending on how + * they are to be used. + *

+ * TarEntries that are created from the header bytes read from + * an archive are instantiated with the TarEntry( byte[] ) + * constructor. These entries will be used when extracting from + * or listing the contents of an archive. These entries have their + * header filled in using the header bytes. They also set the File + * to null, since they reference an archive entry not a file. + *

+ * TarEntries that are created from Files that are to be written + * into an archive are instantiated with the TarEntry( File ) + * constructor. These entries have their header filled in using + * the File's information. They also keep a reference to the File + * for convenience when writing entries. + *

+ * Finally, TarEntries can be constructed from nothing but a name. + * This allows the programmer to construct the entry by hand, for + * instance when only an InputStream is available for writing to + * the archive, and the header information is constructed from + * other information. In this case the header fields are set to + * defaults and the File is set to null. + * + *

+ * The C structure for a Tar Entry's header is: + *

+ * struct header {
+ * char name[NAMSIZ];
+ * char mode[8];
+ * char uid[8];
+ * char gid[8];
+ * char size[12];
+ * char mtime[12];
+ * char chksum[8];
+ * char linkflag;
+ * char linkname[NAMSIZ];
+ * char magic[8];
+ * char uname[TUNMLEN];
+ * char gname[TGNMLEN];
+ * char devmajor[8];
+ * char devminor[8];
+ * } header;
+ * 
+ * + */ + +public class TarEntry implements TarConstants { + /** The entry's name. */ + private StringBuffer name; + + /** The entry's permission mode. */ + private int mode; + + /** The entry's user id. */ + private int userId; + + /** The entry's group id. */ + private int groupId; + + /** The entry's size. */ + private long size; + + /** The entry's modification time. */ + private long modTime; + + /** The entry's link flag. */ + private byte linkFlag; + + /** The entry's link name. */ + private StringBuffer linkName; + + /** The entry's magic tag. */ + private StringBuffer magic; + + /** The entry's user name. */ + private StringBuffer userName; + + /** The entry's group name. */ + private StringBuffer groupName; + + /** The entry's major device number. */ + private int devMajor; + + /** The entry's minor device number. */ + private int devMinor; + + /** The entry's file reference */ + private File file; + + /** Maximum length of a user's name in the tar file */ + public static final int MAX_NAMELEN = 31; + + /** Default permissions bits for directories */ + public static final int DEFAULT_DIR_MODE = 040755; + + /** Default permissions bits for files */ + public static final int DEFAULT_FILE_MODE = 0100644; + + /** Convert millis to seconds */ + public static final int MILLIS_PER_SECOND = 1000; + + /** + * Construct an empty entry and prepares the header values. + */ + private TarEntry () { + this.magic = new StringBuffer(TMAGIC); + this.name = new StringBuffer(); + this.linkName = new StringBuffer(); + + String user = System.getProperty("user.name", ""); + + if (user.length() > MAX_NAMELEN) { + user = user.substring(0, MAX_NAMELEN); + } + + this.userId = 0; + this.groupId = 0; + this.userName = new StringBuffer(user); + this.groupName = new StringBuffer(""); + this.file = null; + } + + /** + * Construct an entry with only a name. This allows the programmer + * to construct the entry's header "by hand". File is set to null. + * + * @param name the entry name + */ + public TarEntry(String name) { + this(name, false); + } + + /** + * Construct an entry with only a name. This allows the programmer + * to construct the entry's header "by hand". File is set to null. + * + * @param name the entry name + * @param preserveLeadingSlashes whether to allow leading slashes + * in the name. + */ + public TarEntry(String name, boolean preserveLeadingSlashes) { + this(); + + name = normalizeFileName(name, preserveLeadingSlashes); + boolean isDir = name.endsWith("/"); + + this.devMajor = 0; + this.devMinor = 0; + this.name = new StringBuffer(name); + this.mode = isDir ? DEFAULT_DIR_MODE : DEFAULT_FILE_MODE; + this.linkFlag = isDir ? LF_DIR : LF_NORMAL; + this.userId = 0; + this.groupId = 0; + this.size = 0; + this.modTime = (new Date()).getTime() / MILLIS_PER_SECOND; + this.linkName = new StringBuffer(""); + this.userName = new StringBuffer(""); + this.groupName = new StringBuffer(""); + this.devMajor = 0; + this.devMinor = 0; + + } + + /** + * Construct an entry with a name and a link flag. + * + * @param name the entry name + * @param linkFlag the entry link flag. + */ + public TarEntry(String name, byte linkFlag) { + this(name); + this.linkFlag = linkFlag; + if (linkFlag == LF_GNUTYPE_LONGNAME) { + magic = new StringBuffer(GNU_TMAGIC); + } + } + + /** + * Construct an entry for a file. File is set to file, and the + * header is constructed from information from the file. + * + * @param file The file that the entry represents. + */ + public TarEntry(File file) { + this(); + + this.file = file; + + String fileName = normalizeFileName(file.getPath(), false); + this.linkName = new StringBuffer(""); + this.name = new StringBuffer(fileName); + + if (file.isDirectory()) { + this.mode = DEFAULT_DIR_MODE; + this.linkFlag = LF_DIR; + + int nameLength = name.length(); + if (nameLength == 0 || name.charAt(nameLength - 1) != '/') { + this.name.append("/"); + } + this.size = 0; + } else { + this.mode = DEFAULT_FILE_MODE; + this.linkFlag = LF_NORMAL; + this.size = file.length(); + } + + this.modTime = file.lastModified() / MILLIS_PER_SECOND; + this.devMajor = 0; + this.devMinor = 0; + } + + /** + * Construct an entry from an archive's header bytes. File is set + * to null. + * + * @param headerBuf The header bytes from a tar archive entry. + */ + public TarEntry(byte[] headerBuf) { + this(); + parseTarHeader(headerBuf); + } + + /** + * Determine if the two entries are equal. Equality is determined + * by the header names being equal. + * + * @param it Entry to be checked for equality. + * @return True if the entries are equal. + */ + public boolean equals(TarEntry it) { + return getName().equals(it.getName()); + } + + /** + * Determine if the two entries are equal. Equality is determined + * by the header names being equal. + * + * @param it Entry to be checked for equality. + * @return True if the entries are equal. + */ + public boolean equals(Object it) { + if (it == null || getClass() != it.getClass()) { + return false; + } + return equals((TarEntry) it); + } + + /** + * Hashcodes are based on entry names. + * + * @return the entry hashcode + */ + public int hashCode() { + return getName().hashCode(); + } + + /** + * Determine if the given entry is a descendant of this entry. + * Descendancy is determined by the name of the descendant + * starting with this entry's name. + * + * @param desc Entry to be checked as a descendent of this. + * @return True if entry is a descendant of this. + */ + public boolean isDescendent(TarEntry desc) { + return desc.getName().startsWith(getName()); + } + + /** + * Get this entry's name. + * + * @return This entry's name. + */ + public String getName() { + return name.toString(); + } + + /** + * Set this entry's name. + * + * @param name This entry's new name. + */ + public void setName(String name) { + this.name = new StringBuffer(normalizeFileName(name, false)); + } + + /** + * Set the mode for this entry + * + * @param mode the mode for this entry + */ + public void setMode(int mode) { + this.mode = mode; + } + + /** + * Get this entry's link name. + * + * @return This entry's link name. + */ + public String getLinkName() { + return linkName.toString(); + } + + /** + * Get this entry's user id. + * + * @return This entry's user id. + */ + public int getUserId() { + return userId; + } + + /** + * Set this entry's user id. + * + * @param userId This entry's new user id. + */ + public void setUserId(int userId) { + this.userId = userId; + } + + /** + * Get this entry's group id. + * + * @return This entry's group id. + */ + public int getGroupId() { + return groupId; + } + + /** + * Set this entry's group id. + * + * @param groupId This entry's new group id. + */ + public void setGroupId(int groupId) { + this.groupId = groupId; + } + + /** + * Get this entry's user name. + * + * @return This entry's user name. + */ + public String getUserName() { + return userName.toString(); + } + + /** + * Set this entry's user name. + * + * @param userName This entry's new user name. + */ + public void setUserName(String userName) { + this.userName = new StringBuffer(userName); + } + + /** + * Get this entry's group name. + * + * @return This entry's group name. + */ + public String getGroupName() { + return groupName.toString(); + } + + /** + * Set this entry's group name. + * + * @param groupName This entry's new group name. + */ + public void setGroupName(String groupName) { + this.groupName = new StringBuffer(groupName); + } + + /** + * Convenience method to set this entry's group and user ids. + * + * @param userId This entry's new user id. + * @param groupId This entry's new group id. + */ + public void setIds(int userId, int groupId) { + setUserId(userId); + setGroupId(groupId); + } + + /** + * Convenience method to set this entry's group and user names. + * + * @param userName This entry's new user name. + * @param groupName This entry's new group name. + */ + public void setNames(String userName, String groupName) { + setUserName(userName); + setGroupName(groupName); + } + + /** + * Set this entry's modification time. The parameter passed + * to this method is in "Java time". + * + * @param time This entry's new modification time. + */ + public void setModTime(long time) { + modTime = time / MILLIS_PER_SECOND; + } + + /** + * Set this entry's modification time. + * + * @param time This entry's new modification time. + */ + public void setModTime(Date time) { + modTime = time.getTime() / MILLIS_PER_SECOND; + } + + /** + * Set this entry's modification time. + * + * @return time This entry's new modification time. + */ + public Date getModTime() { + return new Date(modTime * MILLIS_PER_SECOND); + } + + /** + * Get this entry's file. + * + * @return This entry's file. + */ + public File getFile() { + return file; + } + + /** + * Get this entry's mode. + * + * @return This entry's mode. + */ + public int getMode() { + return mode; + } + + /** + * Get this entry's file size. + * + * @return This entry's file size. + */ + public long getSize() { + return size; + } + + /** + * Set this entry's file size. + * + * @param size This entry's new file size. + */ + public void setSize(long size) { + this.size = size; + } + + + /** + * Indicate if this entry is a GNU long name block + * + * @return true if this is a long name extension provided by GNU tar + */ + public boolean isGNULongNameEntry() { + return linkFlag == LF_GNUTYPE_LONGNAME + && name.toString().equals(GNU_LONGLINK); + } + + /** + * Return whether or not this entry represents a directory. + * + * @return True if this entry is a directory. + */ + public boolean isDirectory() { + if (file != null) { + return file.isDirectory(); + } + + if (linkFlag == LF_DIR) { + return true; + } + + if (getName().endsWith("/")) { + return true; + } + + return false; + } + + /** + * If this entry represents a file, and the file is a directory, return + * an array of TarEntries for this entry's children. + * + * @return An array of TarEntry's for this entry's children. + */ + public TarEntry[] getDirectoryEntries() { + if (file == null || !file.isDirectory()) { + return new TarEntry[0]; + } + + String[] list = file.list(); + TarEntry[] result = new TarEntry[list.length]; + + for (int i = 0; i < list.length; ++i) { + result[i] = new TarEntry(new File(file, list[i])); + } + + return result; + } + + /** + * Write an entry's header information to a header buffer. + * + * @param outbuf The tar entry header buffer to fill in. + */ + public void writeEntryHeader(byte[] outbuf) { + int offset = 0; + + offset = TarUtils.getNameBytes(name, outbuf, offset, NAMELEN); + offset = TarUtils.getOctalBytes(mode, outbuf, offset, MODELEN); + offset = TarUtils.getOctalBytes(userId, outbuf, offset, UIDLEN); + offset = TarUtils.getOctalBytes(groupId, outbuf, offset, GIDLEN); + offset = TarUtils.getLongOctalBytes(size, outbuf, offset, SIZELEN); + offset = TarUtils.getLongOctalBytes(modTime, outbuf, offset, MODTIMELEN); + + int csOffset = offset; + + for (int c = 0; c < CHKSUMLEN; ++c) { + outbuf[offset++] = (byte) ' '; + } + + outbuf[offset++] = linkFlag; + offset = TarUtils.getNameBytes(linkName, outbuf, offset, NAMELEN); + offset = TarUtils.getNameBytes(magic, outbuf, offset, MAGICLEN); + offset = TarUtils.getNameBytes(userName, outbuf, offset, UNAMELEN); + offset = TarUtils.getNameBytes(groupName, outbuf, offset, GNAMELEN); + offset = TarUtils.getOctalBytes(devMajor, outbuf, offset, DEVLEN); + offset = TarUtils.getOctalBytes(devMinor, outbuf, offset, DEVLEN); + + while (offset < outbuf.length) { + outbuf[offset++] = 0; + } + + long chk = TarUtils.computeCheckSum(outbuf); + + TarUtils.getCheckSumOctalBytes(chk, outbuf, csOffset, CHKSUMLEN); + } + + /** + * Parse an entry's header information from a header buffer. + * + * @param header The tar entry header buffer to get information from. + */ + public void parseTarHeader(byte[] header) { + int offset = 0; + + name = TarUtils.parseName(header, offset, NAMELEN); + offset += NAMELEN; + mode = (int) TarUtils.parseOctal(header, offset, MODELEN); + offset += MODELEN; + userId = (int) TarUtils.parseOctal(header, offset, UIDLEN); + offset += UIDLEN; + groupId = (int) TarUtils.parseOctal(header, offset, GIDLEN); + offset += GIDLEN; + size = TarUtils.parseOctal(header, offset, SIZELEN); + offset += SIZELEN; + modTime = TarUtils.parseOctal(header, offset, MODTIMELEN); + offset += MODTIMELEN; + offset += CHKSUMLEN; + linkFlag = header[offset++]; + linkName = TarUtils.parseName(header, offset, NAMELEN); + offset += NAMELEN; + magic = TarUtils.parseName(header, offset, MAGICLEN); + offset += MAGICLEN; + userName = TarUtils.parseName(header, offset, UNAMELEN); + offset += UNAMELEN; + groupName = TarUtils.parseName(header, offset, GNAMELEN); + offset += GNAMELEN; + devMajor = (int) TarUtils.parseOctal(header, offset, DEVLEN); + offset += DEVLEN; + devMinor = (int) TarUtils.parseOctal(header, offset, DEVLEN); + } + + /** + * Strips Windows' drive letter as well as any leading slashes, + * turns path separators into forward slahes. + */ + private static String normalizeFileName(String fileName, + boolean preserveLeadingSlashes) { + String osname = System.getProperty("os.name").toLowerCase(Locale.ENGLISH); + + if (osname != null) { + + // Strip off drive letters! + // REVIEW Would a better check be "(File.separator == '\')"? + + if (osname.startsWith("windows")) { + if (fileName.length() > 2) { + char ch1 = fileName.charAt(0); + char ch2 = fileName.charAt(1); + + if (ch2 == ':' + && ((ch1 >= 'a' && ch1 <= 'z') + || (ch1 >= 'A' && ch1 <= 'Z'))) { + fileName = fileName.substring(2); + } + } + } else if (osname.indexOf("netware") > -1) { + int colon = fileName.indexOf(':'); + if (colon != -1) { + fileName = fileName.substring(colon + 1); + } + } + } + + fileName = fileName.replace(File.separatorChar, '/'); + + // No absolute pathnames + // Windows (and Posix?) paths can start with "\\NetworkDrive\", + // so we loop on starting /'s. + while (!preserveLeadingSlashes && fileName.startsWith("/")) { + fileName = fileName.substring(1); + } + return fileName; + } +} Added: activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarInputStream.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarInputStream.java?rev=1402652&view=auto ============================================================================== --- activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarInputStream.java (added) +++ activemq/trunk/activemq-console/src/main/java/org/apache/activemq/console/command/store/tar/TarInputStream.java Fri Oct 26 20:28:49 2012 @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* + * This package is based on the work done by Timothy Gerard Endres + * (time@ice.com) to whom the Ant project is very grateful for his great code. + */ + +package org.apache.activemq.console.command.store.tar; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * The TarInputStream reads a UNIX tar archive as an InputStream. + * methods are provided to position at each successive entry in + * the archive, and the read each entry as a normal input stream + * using read(). + * + */ +public class TarInputStream extends FilterInputStream { + private static final int SMALL_BUFFER_SIZE = 256; + private static final int BUFFER_SIZE = 8 * 1024; + private static final int LARGE_BUFFER_SIZE = 32 * 1024; + private static final int BYTE_MASK = 0xFF; + + // CheckStyle:VisibilityModifier OFF - bc + protected boolean debug; + protected boolean hasHitEOF; + protected long entrySize; + protected long entryOffset; + protected byte[] readBuf; + protected TarBuffer buffer; + protected TarEntry currEntry; + + /** + * This contents of this array is not used at all in this class, + * it is only here to avoid repreated object creation during calls + * to the no-arg read method. + */ + protected byte[] oneBuf; + + // CheckStyle:VisibilityModifier ON + + /** + * Constructor for TarInputStream. + * @param is the input stream to use + */ + public TarInputStream(InputStream is) { + this(is, TarBuffer.DEFAULT_BLKSIZE, TarBuffer.DEFAULT_RCDSIZE); + } + + /** + * Constructor for TarInputStream. + * @param is the input stream to use + * @param blockSize the block size to use + */ + public TarInputStream(InputStream is, int blockSize) { + this(is, blockSize, TarBuffer.DEFAULT_RCDSIZE); + } + + /** + * Constructor for TarInputStream. + * @param is the input stream to use + * @param blockSize the block size to use + * @param recordSize the record size to use + */ + public TarInputStream(InputStream is, int blockSize, int recordSize) { + super(is); + + this.buffer = new TarBuffer(is, blockSize, recordSize); + this.readBuf = null; + this.oneBuf = new byte[1]; + this.debug = false; + this.hasHitEOF = false; + } + + /** + * Sets the debugging flag. + * + * @param debug True to turn on debugging. + */ + public void setDebug(boolean debug) { + this.debug = debug; + buffer.setDebug(debug); + } + + /** + * Closes this stream. Calls the TarBuffer's close() method. + * @throws IOException on error + */ + public void close() throws IOException { + buffer.close(); + } + + /** + * Get the record size being used by this stream's TarBuffer. + * + * @return The TarBuffer record size. + */ + public int getRecordSize() { + return buffer.getRecordSize(); + } + + /** + * Get the available data that can be read from the current + * entry in the archive. This does not indicate how much data + * is left in the entire archive, only in the current entry. + * This value is determined from the entry's size header field + * and the amount of data already read from the current entry. + * Integer.MAX_VALUE is returen in case more than Integer.MAX_VALUE + * bytes are left in the current entry in the archive. + * + * @return The number of available bytes for the current entry. + * @throws IOException for signature + */ + public int available() throws IOException { + if (entrySize - entryOffset > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + return (int) (entrySize - entryOffset); + } + + /** + * Skip bytes in the input buffer. This skips bytes in the + * current entry's data, not the entire archive, and will + * stop at the end of the current entry's data if the number + * to skip extends beyond that point. + * + * @param numToSkip The number of bytes to skip. + * @return the number actually skipped + * @throws IOException on error + */ + public long skip(long numToSkip) throws IOException { + // REVIEW + // This is horribly inefficient, but it ensures that we + // properly skip over bytes via the TarBuffer... + // + byte[] skipBuf = new byte[BUFFER_SIZE]; + long skip = numToSkip; + while (skip > 0) { + int realSkip = (int) (skip > skipBuf.length ? skipBuf.length : skip); + int numRead = read(skipBuf, 0, realSkip); + if (numRead == -1) { + break; + } + skip -= numRead; + } + return (numToSkip - skip); + } + + /** + * Since we do not support marking just yet, we return false. + * + * @return False. + */ + public boolean markSupported() { + return false; + } + + /** + * Since we do not support marking just yet, we do nothing. + * + * @param markLimit The limit to mark. + */ + public void mark(int markLimit) { + } + + /** + * Since we do not support marking just yet, we do nothing. + */ + public void reset() { + } + + /** + * Get the next entry in this tar archive. This will skip + * over any remaining data in the current entry, if there + * is one, and place the input stream at the header of the + * next entry, and read the header and instantiate a new + * TarEntry from the header bytes and return that entry. + * If there are no more entries in the archive, null will + * be returned to indicate that the end of the archive has + * been reached. + * + * @return The next TarEntry in the archive, or null. + * @throws IOException on error + */ + public TarEntry getNextEntry() throws IOException { + if (hasHitEOF) { + return null; + } + + if (currEntry != null) { + long numToSkip = entrySize - entryOffset; + + if (debug) { + System.err.println("TarInputStream: SKIP currENTRY '" + + currEntry.getName() + "' SZ " + + entrySize + " OFF " + + entryOffset + " skipping " + + numToSkip + " bytes"); + } + + while (numToSkip > 0) { + long skipped = skip(numToSkip); + if (skipped <= 0) { + throw new RuntimeException("failed to skip current tar" + + " entry"); + } + numToSkip -= skipped; + } + + readBuf = null; + } + + byte[] headerBuf = buffer.readRecord(); + + if (headerBuf == null) { + if (debug) { + System.err.println("READ NULL RECORD"); + } + hasHitEOF = true; + } else if (buffer.isEOFRecord(headerBuf)) { + if (debug) { + System.err.println("READ EOF RECORD"); + } + hasHitEOF = true; + } + + if (hasHitEOF) { + currEntry = null; + } else { + currEntry = new TarEntry(headerBuf); + + if (debug) { + System.err.println("TarInputStream: SET CURRENTRY '" + + currEntry.getName() + + "' size = " + + currEntry.getSize()); + } + + entryOffset = 0; + + entrySize = currEntry.getSize(); + } + + if (currEntry != null && currEntry.isGNULongNameEntry()) { + // read in the name + StringBuffer longName = new StringBuffer(); + byte[] buf = new byte[SMALL_BUFFER_SIZE]; + int length = 0; + while ((length = read(buf)) >= 0) { + longName.append(new String(buf, 0, length)); + } + getNextEntry(); + if (currEntry == null) { + // Bugzilla: 40334 + // Malformed tar file - long entry name not followed by entry + return null; + } + // remove trailing null terminator + if (longName.length() > 0 + && longName.charAt(longName.length() - 1) == 0) { + longName.deleteCharAt(longName.length() - 1); + } + currEntry.setName(longName.toString()); + } + + return currEntry; + } + + /** + * Reads a byte from the current tar archive entry. + * + * This method simply calls read( byte[], int, int ). + * + * @return The byte read, or -1 at EOF. + * @throws IOException on error + */ + public int read() throws IOException { + int num = read(oneBuf, 0, 1); + return num == -1 ? -1 : ((int) oneBuf[0]) & BYTE_MASK; + } + + /** + * Reads bytes from the current tar archive entry. + * + * This method is aware of the boundaries of the current + * entry in the archive and will deal with them as if they + * were this stream's start and EOF. + * + * @param buf The buffer into which to place bytes read. + * @param offset The offset at which to place bytes read. + * @param numToRead The number of bytes to read. + * @return The number of bytes read, or -1 at EOF. + * @throws IOException on error + */ + public int read(byte[] buf, int offset, int numToRead) throws IOException { + int totalRead = 0; + + if (entryOffset >= entrySize) { + return -1; + } + + if ((numToRead + entryOffset) > entrySize) { + numToRead = (int) (entrySize - entryOffset); + } + + if (readBuf != null) { + int sz = (numToRead > readBuf.length) ? readBuf.length + : numToRead; + + System.arraycopy(readBuf, 0, buf, offset, sz); + + if (sz >= readBuf.length) { + readBuf = null; + } else { + int newLen = readBuf.length - sz; + byte[] newBuf = new byte[newLen]; + + System.arraycopy(readBuf, sz, newBuf, 0, newLen); + + readBuf = newBuf; + } + + totalRead += sz; + numToRead -= sz; + offset += sz; + } + + while (numToRead > 0) { + byte[] rec = buffer.readRecord(); + + if (rec == null) { + // Unexpected EOF! + throw new IOException("unexpected EOF with " + numToRead + + " bytes unread"); + } + + int sz = numToRead; + int recLen = rec.length; + + if (recLen > sz) { + System.arraycopy(rec, 0, buf, offset, sz); + + readBuf = new byte[recLen - sz]; + + System.arraycopy(rec, sz, readBuf, 0, recLen - sz); + } else { + sz = recLen; + + System.arraycopy(rec, 0, buf, offset, recLen); + } + + totalRead += sz; + numToRead -= sz; + offset += sz; + } + + entryOffset += totalRead; + + return totalRead; + } + + /** + * Copies the contents of the current tar archive entry directly into + * an output stream. + * + * @param out The OutputStream into which to write the entry's data. + * @throws IOException on error + */ + public void copyEntryContents(OutputStream out) throws IOException { + byte[] buf = new byte[LARGE_BUFFER_SIZE]; + + while (true) { + int numRead = read(buf, 0, buf.length); + + if (numRead == -1) { + break; + } + + out.write(buf, 0, numRead); + } + } +}