activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r818209 - in /activemq/trunk/activemq-optional: ./ src/main/java/org/apache/activemq/store/ src/main/java/org/apache/activemq/store/amq/ src/main/java/org/apache/activemq/store/amq/reader/ src/main/java/org/apache/activemq/transport/discove...
Date Wed, 23 Sep 2009 19:07:00 GMT
Author: chirino
Date: Wed Sep 23 19:06:58 2009
New Revision: 818209

URL: http://svn.apache.org/viewvc?rev=818209&view=rev
Log:
AMQ-2408 and AMQ-2407 
 - adding new tool to manually inspect/audit the amqPersistenceAdapter's journal files.
 - adding new HTTP based discovery agent


Added:
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalTool.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalToolCommand.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/CommandLineSupport.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/CustomResourceLoader.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/Entry.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/MessageBodyFormatter.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQIterator.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQReader.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/MessageLocation.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/DiscoveryRegistryServlet.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgentFactory.java   (with props)
    activemq/trunk/activemq-optional/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/
    activemq/trunk/activemq-optional/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/http
    activemq/trunk/activemq-optional/src/main/resources/org/
    activemq/trunk/activemq-optional/src/main/resources/org/apache/
    activemq/trunk/activemq-optional/src/main/resources/org/apache/activemq/
    activemq/trunk/activemq-optional/src/main/resources/org/apache/activemq/store/
    activemq/trunk/activemq-optional/src/main/resources/org/apache/activemq/store/amq/
    activemq/trunk/activemq-optional/src/main/resources/org/apache/activemq/store/amq/help.txt
    activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/store/
    activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/store/amq/
    activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/store/amq/reader/
    activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/store/amq/reader/AMQReaderTest.java
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-1   (with props)
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-2   (with props)
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-3   (with props)
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-4   (with props)
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-5   (with props)
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-6   (with props)
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-7   (with props)
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-8   (with props)
    activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-9   (with props)
Modified:
    activemq/trunk/activemq-optional/pom.xml

Modified: activemq/trunk/activemq-optional/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/pom.xml?rev=818209&r1=818208&r2=818209&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/pom.xml (original)
+++ activemq/trunk/activemq-optional/pom.xml Wed Sep 23 19:06:58 2009
@@ -44,7 +44,11 @@
       <groupId>${pom.groupId}</groupId>
       <artifactId>activeio-core</artifactId>
     </dependency>    
-    
+    <dependency>
+      <groupId>${pom.groupId}</groupId>
+      <artifactId>activemq-console</artifactId>
+    </dependency>
+        
     <dependency>
       <groupId>org.springframework</groupId>
       <artifactId>spring</artifactId>
@@ -153,7 +157,22 @@
       <groupId>org.codehaus.jettison</groupId>
       <artifactId>jettison</artifactId>
       <scope>test</scope>
-    </dependency>       
+    </dependency>  
+    
+    <dependency>
+      <groupId>velocity</groupId>
+      <artifactId>velocity</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>net.sf.josql</groupId>
+      <artifactId>josql</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>net.sf.josql</groupId>
+      <artifactId>gentlyweb-utils</artifactId>
+    </dependency>
+    
+         
   </dependencies>
 
   <build>

Added: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalTool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalTool.java?rev=818209&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalTool.java (added)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalTool.java Wed Sep 23 19:06:58 2009
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq;
+
+import java.io.File;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.JournalQueueAck;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.JournalTrace;
+import org.apache.activemq.command.JournalTransaction;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.kaha.impl.async.ReadOnlyAsyncDataManager;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.velocity.Template;
+import org.apache.velocity.VelocityContext;
+import org.apache.velocity.app.Velocity;
+import org.apache.velocity.app.VelocityEngine;
+import org.josql.Query;
+
+/**
+ * Allows you to view the contents of a Journal.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AMQJournalTool {
+
+	private final ArrayList<File> dirs = new ArrayList<File>();
+	private final WireFormat wireFormat = new OpenWireFormat();
+	private final HashMap<String, String> resources = new HashMap<String, String>();
+
+	private String messageFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}";
+	private String topicAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}";
+	private String queueAckFormat = "${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}";
+	private String transactionFormat = "${location.dataFileId},${location.offset}|${type}|${record.transactionId}";
+	private String traceFormat = "${location.dataFileId},${location.offset}|${type}|${record.message}";
+	private String unknownFormat = "${location.dataFileId},${location.offset}|${type}|${record.class.name}";
+	private String where;
+	private VelocityContext context;
+	private VelocityEngine velocity;
+	private boolean help;
+
+	public static void main(String[] args) throws Exception {
+		AMQJournalTool consumerTool = new AMQJournalTool();
+		String[] directories = CommandLineSupport
+				.setOptions(consumerTool, args);
+		if (directories.length < 1) {
+			System.out
+					.println("Please specify the directories with journal data to scan");
+			return;
+		}
+		for (int i = 0; i < directories.length; i++) {
+			consumerTool.getDirs().add(new File(directories[i]));
+		}
+		consumerTool.execute();
+	}
+
+	public void execute() throws Exception {
+
+		if( help ) {
+			showHelp();
+			return;
+		}
+		
+		if (getDirs().size() < 1) {
+			System.out.println("");
+			System.out.println("Invalid Usage: Please specify the directories with journal data to scan");
+			System.out.println("");
+			showHelp();
+			return;
+		}
+
+		for (File dir : getDirs()) {
+			if( !dir.exists() ) {
+				System.out.println("");
+				System.out.println("Invalid Usage: the directory '"+dir.getPath()+"' does not exist");
+				System.out.println("");
+				showHelp();
+				return;
+			}
+			if( !dir.isDirectory() ) {
+				System.out.println("");
+				System.out.println("Invalid Usage: the argument '"+dir.getPath()+"' is not a directory");
+				System.out.println("");
+				showHelp();
+				return;
+			}
+		}
+		
+		
+		context = new VelocityContext();
+		List keys = Arrays.asList(context.getKeys());
+
+		for (Iterator iterator = System.getProperties().entrySet()
+				.iterator(); iterator.hasNext();) {
+			Map.Entry kv = (Map.Entry) iterator.next();
+			String name = (String) kv.getKey();
+			String value = (String) kv.getValue();
+
+			if (!keys.contains(name)) {
+				context.put(name, value);
+			}
+		}
+		
+		velocity = new VelocityEngine();
+		velocity.setProperty(Velocity.RESOURCE_LOADER, "all");
+		velocity.setProperty("all.resource.loader.class", CustomResourceLoader.class.getName());
+		velocity.init();
+
+
+		resources.put("message", messageFormat);
+		resources.put("topicAck", topicAckFormat);
+		resources.put("queueAck", queueAckFormat);
+		resources.put("transaction", transactionFormat);
+		resources.put("trace", traceFormat);
+		resources.put("unknown", unknownFormat);
+
+		Query query = null;
+		if (where != null) {
+			query = new Query();
+			query.parse("select * from "+Entry.class.getName()+" where "+where);
+
+		}
+
+		ReadOnlyAsyncDataManager manager = new ReadOnlyAsyncDataManager(getDirs());
+		manager.start();
+		try {
+			Location curr = manager.getFirstLocation();
+			while (curr != null) {
+
+				ByteSequence data = manager.read(curr);
+				DataStructure c = (DataStructure) wireFormat.unmarshal(data);
+
+				Entry entry = new Entry();
+				entry.setLocation(curr);
+				entry.setRecord(c);
+				entry.setData(data);
+				entry.setQuery(query);
+				process(entry);
+
+				curr = manager.getNextLocation(curr);
+			}
+		} finally {
+			manager.close();
+		}
+	}
+
+	private void showHelp() {
+		InputStream is = AMQJournalTool.class.getResourceAsStream("help.txt");
+		Scanner scanner = new Scanner(is);
+		while (scanner.hasNextLine()) {
+			String line = scanner.nextLine();
+			System.out.println(line);
+		}
+		scanner.close();	}
+
+	private void process(Entry entry) throws Exception {
+
+		Location location = entry.getLocation();
+		DataStructure record = entry.getRecord();
+
+		switch (record.getDataStructureType()) {
+		case ActiveMQMessage.DATA_STRUCTURE_TYPE:
+			entry.setType("ActiveMQMessage");
+			entry.setFormater("message");
+			display(entry);
+			break;
+		case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE:
+			entry.setType("ActiveMQBytesMessage");
+			entry.setFormater("message");
+			display(entry);
+			break;
+		case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE:
+			entry.setType("ActiveMQBlobMessage");
+			entry.setFormater("message");
+			display(entry);
+			break;
+		case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
+			entry.setType("ActiveMQMapMessage");
+			entry.setFormater("message");
+			display(entry);
+			break;
+		case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
+			entry.setType("ActiveMQObjectMessage");
+			entry.setFormater("message");
+			display(entry);
+			break;
+		case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
+			entry.setType("ActiveMQStreamMessage");
+			entry.setFormater("message");
+			display(entry);
+			break;
+		case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
+			entry.setType("ActiveMQTextMessage");
+			entry.setFormater("message");
+			display(entry);
+			break;
+		case JournalQueueAck.DATA_STRUCTURE_TYPE:
+			entry.setType("Queue Ack");
+			entry.setFormater("queueAck");
+			display(entry);
+			break;
+		case JournalTopicAck.DATA_STRUCTURE_TYPE:
+			entry.setType("Topic Ack");
+			entry.setFormater("topicAck");
+			display(entry);
+			break;
+		case JournalTransaction.DATA_STRUCTURE_TYPE:
+			entry.setType(getType((JournalTransaction) record));
+			entry.setFormater("transaction");
+			display(entry);
+			break;
+		case JournalTrace.DATA_STRUCTURE_TYPE:
+			entry.setType("Trace");
+			entry.setFormater("trace");
+			display(entry);
+			break;
+		default:
+			entry.setType("Unknown");
+			entry.setFormater("unknown");
+			display(entry);
+			break;
+		}
+	}
+
+	private String getType(JournalTransaction record) {
+		switch (record.getType()) {
+		case JournalTransaction.XA_PREPARE:
+			return "XA Prepare";
+		case JournalTransaction.XA_COMMIT:
+			return "XA Commit";
+		case JournalTransaction.XA_ROLLBACK:
+			return "XA Rollback";
+		case JournalTransaction.LOCAL_COMMIT:
+			return "Commit";
+		case JournalTransaction.LOCAL_ROLLBACK:
+			return "Rollback";
+		}
+		return "Unknown Transaction";
+	}
+
+	private void display(Entry entry) throws Exception {
+
+		if (entry.getQuery() != null) {
+			List list = Collections.singletonList(entry);
+			List results = entry.getQuery().execute(list).getResults();
+			if (results.isEmpty()) {
+				return;
+			}
+		}
+
+		CustomResourceLoader.setResources(resources);
+		try {
+
+			context.put("location", entry.getLocation());
+			context.put("record", entry.getRecord());
+			context.put("type", entry.getType());
+			if (entry.getRecord() instanceof ActiveMQMessage) {
+				context.put("body", new MessageBodyFormatter(
+						(ActiveMQMessage) entry.getRecord()));
+			}
+
+			Template template = velocity.getTemplate(entry.getFormater());
+			PrintWriter writer = new PrintWriter(System.out);
+			template.merge(context, writer);
+			writer.println();
+			writer.flush();
+		} finally {
+			CustomResourceLoader.setResources(null);
+		}
+	}
+
+	public void setMessageFormat(String messageFormat) {
+		this.messageFormat = messageFormat;
+	}
+
+	public void setTopicAckFormat(String ackFormat) {
+		this.topicAckFormat = ackFormat;
+	}
+
+	public void setTransactionFormat(String transactionFormat) {
+		this.transactionFormat = transactionFormat;
+	}
+
+	public void setTraceFormat(String traceFormat) {
+		this.traceFormat = traceFormat;
+	}
+
+	public void setUnknownFormat(String unknownFormat) {
+		this.unknownFormat = unknownFormat;
+	}
+
+	public void setQueueAckFormat(String queueAckFormat) {
+		this.queueAckFormat = queueAckFormat;
+	}
+
+	public String getQuery() {
+		return where;
+	}
+
+	public void setWhere(String query) {
+		this.where = query;
+	}
+
+	public boolean isHelp() {
+		return help;
+	}
+
+	public void setHelp(boolean help) {
+		this.help = help;
+	}
+
+	/**
+	 * @return the dirs
+	 */
+	public ArrayList<File> getDirs() {
+		return dirs;
+	}
+
+}

Added: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalToolCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalToolCommand.java?rev=818209&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalToolCommand.java (added)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/AMQJournalToolCommand.java Wed Sep 23 19:06:58 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.activemq.console.CommandContext;
+import org.apache.activemq.console.command.Command;
+
+public class AMQJournalToolCommand implements Command {
+
+	private CommandContext context;
+
+	public void execute(List<String> tokens) throws Exception {
+		AMQJournalTool consumerTool = new AMQJournalTool();
+		String args[] = new String[tokens.size()];
+		tokens.toArray(args);
+		String[] directories = CommandLineSupport.setOptions(consumerTool, args);
+		for (int i = 0; i < directories.length; i++) {
+			consumerTool.getDirs().add(new File(directories[i]));
+		}
+		consumerTool.execute();		
+	}
+
+	public void setCommandContext(CommandContext context) {
+		this.context = context;		
+	}
+
+}

Added: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/CommandLineSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/CommandLineSupport.java?rev=818209&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/CommandLineSupport.java (added)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/CommandLineSupport.java Wed Sep 23 19:06:58 2009
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq;
+
+import java.util.ArrayList;
+
+import org.apache.activemq.util.IntrospectionSupport;
+
+/**
+ * Helper utility that can be used to set the properties on any object using
+ * command line arguments.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public final class CommandLineSupport {
+
+    private CommandLineSupport() {
+    }
+    
+    /**
+     * Sets the properties of an object given the command line args.
+     * 
+     * if args contains: --ack-mode=AUTO --url=tcp://localhost:61616 --persistent 
+     * 
+     * then it will try to call the following setters on the target object.
+     * 
+     * target.setAckMode("AUTO");
+     * target.setURL(new URI("tcp://localhost:61616") );
+     * target.setPersistent(true);
+     * 
+     * Notice the the proper conversion for the argument is determined by examining the 
+     * setter arguement type.  
+     * 
+     * @param target the object that will have it's properties set
+     * @param args the commline options
+     * @return any arguments that are not valid options for the target
+     */
+    public static String[] setOptions(Object target, String[] args) {
+        ArrayList<String> rc = new ArrayList<String>();
+
+        for (int i = 0; i < args.length; i++) {
+            if (args[i] == null) {
+                continue;
+            }
+
+            if (args[i].startsWith("--")) {
+
+                // --options without a specified value are considered boolean
+                // flags that are enabled.
+                String value = "true";
+                String name = args[i].substring(2);
+
+                // if --option=value case
+                int p = name.indexOf("=");
+                if (p > 0) {
+                    value = name.substring(p + 1);
+                    name = name.substring(0, p);
+                }
+
+                // name not set, then it's an unrecognized option
+                if (name.length() == 0) {
+                    rc.add(args[i]);
+                    continue;
+                }
+
+                String propName = convertOptionToPropertyName(name);
+                if (!IntrospectionSupport.setProperty(target, propName, value)) {
+                    rc.add(args[i]);
+                    continue;
+                }
+            } else {
+                rc.add(args[i]);
+            }
+
+        }
+
+        String r[] = new String[rc.size()];
+        rc.toArray(r);
+        return r;
+    }
+
+    /**
+     * converts strings like: test-enabled to testEnabled
+     * 
+     * @param name
+     * @return
+     */
+    private static String convertOptionToPropertyName(String name) {
+        String rc = "";
+
+        // Look for '-' and strip and then convert the subsequent char to
+        // uppercase
+        int p = name.indexOf("-");
+        while (p > 0) {
+            // strip
+            rc += name.substring(0, p);
+            name = name.substring(p + 1);
+
+            // can I convert the next char to upper?
+            if (name.length() > 0) {
+                rc += name.substring(0, 1).toUpperCase();
+                name = name.substring(1);
+            }
+
+            p = name.indexOf("-");
+        }
+        return rc + name;
+    }
+}

Added: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/CustomResourceLoader.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/CustomResourceLoader.java?rev=818209&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/CustomResourceLoader.java (added)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/CustomResourceLoader.java Wed Sep 23 19:06:58 2009
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+
+import org.apache.commons.collections.ExtendedProperties;
+import org.apache.velocity.exception.ResourceNotFoundException;
+import org.apache.velocity.runtime.RuntimeServices;
+import org.apache.velocity.runtime.resource.Resource;
+import org.apache.velocity.runtime.resource.loader.FileResourceLoader;
+import org.apache.velocity.runtime.resource.loader.ResourceLoader;
+
+public class CustomResourceLoader extends ResourceLoader {
+	
+	private final static ThreadLocal<HashMap<String, String>> resourcesTL = new ThreadLocal<HashMap<String, String>>();
+	private final FileResourceLoader fileResourceLoader = new FileResourceLoader();
+	
+	@Override
+	public void commonInit(RuntimeServices rs, ExtendedProperties configuration) {
+		super.commonInit(rs, configuration);
+		fileResourceLoader.commonInit(rs, configuration);
+	}
+	
+    public void init( ExtendedProperties configuration)
+    {
+    	fileResourceLoader.init(configuration);
+    }
+    
+    /**
+     */
+    public synchronized InputStream getResourceStream( String name )
+        throws ResourceNotFoundException
+    {
+        InputStream result = null;
+        
+        if (name == null || name.length() == 0)
+        {
+            throw new ResourceNotFoundException ("No template name provided");
+        }
+        
+        String value = null;
+        HashMap<String, String> resources = resourcesTL.get();
+        if( resources!=null ) {
+        	value = resources.get(name);
+        }
+        
+    	if( value == null ) {
+    		result = this.fileResourceLoader.getResourceStream(name);
+    	} else {
+            try 
+            {
+            	result = new ByteArrayInputStream(value.getBytes());
+            }
+            catch( Exception e )
+            {
+                throw new ResourceNotFoundException( e.getMessage() );
+            }
+    	}
+        return result;
+    }
+    
+    public boolean isSourceModified(Resource resource)
+    {
+        return false;
+    }
+
+    public long getLastModified(Resource resource)
+    {
+        return 0;
+    }
+
+	static public HashMap<String, String> getResources() {
+		return resourcesTL.get();
+	}
+
+	static public void setResources(HashMap<String, String> arg0) {
+		resourcesTL.set(arg0);
+	}
+
+}

Added: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/Entry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/Entry.java?rev=818209&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/Entry.java (added)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/Entry.java Wed Sep 23 19:06:58 2009
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq;
+
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.util.ByteSequence;
+import org.josql.Query;
+
+public class Entry {
+	
+	Location location;
+	DataStructure record;
+	private ByteSequence data;
+	private String type;
+	private String formater;
+	private Query query;
+	
+	public Location getLocation() {
+		return location;
+	}
+	public void setLocation(Location location) {
+		this.location = location;
+	}
+	public DataStructure getRecord() {
+		return record;
+	}
+	public void setRecord(DataStructure record) {
+		this.record = record;
+	}
+	public void setData(ByteSequence data) {
+		this.data = data;
+	}
+	public void setType(String type) {
+		this.type = type;
+	}
+	public ByteSequence getData() {
+		return data;
+	}
+	public String getType() {
+		return type;
+	}
+	public void setFormater(String formater) {
+		this.formater = formater;
+	}
+	public String getFormater() {
+		return formater;
+	}
+	public void setQuery(Query query) {
+		this.query = query;
+	}
+	public Query getQuery() {
+		return query;
+	}
+	
+}

Added: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/MessageBodyFormatter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/MessageBodyFormatter.java?rev=818209&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/MessageBodyFormatter.java (added)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/MessageBodyFormatter.java Wed Sep 23 19:06:58 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.util.ByteSequence;
+
+public class MessageBodyFormatter {
+	final ActiveMQMessage message;
+	
+	public MessageBodyFormatter(ActiveMQMessage message) {
+		this.message=message;
+	}
+
+	@Override
+	public String toString() {
+		try {
+			switch (message.getDataStructureType()) {
+			case ActiveMQMessage.DATA_STRUCTURE_TYPE:
+				return "";
+			case ActiveMQBlobMessage.DATA_STRUCTURE_TYPE:
+				ActiveMQBlobMessage blob = (ActiveMQBlobMessage) message;
+				return blob.getRemoteBlobUrl();
+			case ActiveMQMapMessage.DATA_STRUCTURE_TYPE:
+				ActiveMQMapMessage map = (ActiveMQMapMessage)message;
+				return map.getContentMap().toString();			
+			case ActiveMQTextMessage.DATA_STRUCTURE_TYPE:
+				ActiveMQTextMessage text = (ActiveMQTextMessage)message;
+				return text.getText();
+			case ActiveMQBytesMessage.DATA_STRUCTURE_TYPE:
+			case ActiveMQObjectMessage.DATA_STRUCTURE_TYPE:
+			case ActiveMQStreamMessage.DATA_STRUCTURE_TYPE:
+				ByteSequence data = message.getContent();
+				return "binary payload {length="+data.getLength()+", compressed="+message.isCompressed()+"}";
+			}
+		} catch (JMSException e) {
+		}
+		return "";
+	}
+}

Added: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQIterator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQIterator.java?rev=818209&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQIterator.java (added)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQIterator.java Wed Sep 23 19:06:58 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq.reader;
+
+import java.util.Iterator;
+
+import javax.jms.Message;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+
+/**
+ * An Iterator for the AMQReader
+ *
+ */
+class AMQIterator  implements Iterator<Message>{
+    private AMQReader reader;
+    private BooleanExpression expression;
+    private MessageLocation currentLocation;
+    private MessageLocation nextLocation;
+    private boolean valid=true;
+    
+        
+    AMQIterator(AMQReader reader, BooleanExpression expression){
+        this.reader=reader;
+        this.expression=expression;
+    } 
+    
+    public boolean hasNext() {
+        try {
+            this.nextLocation = reader.getNextMessage(currentLocation);
+            Message next = nextLocation != null ? nextLocation.getMessage()
+                    : null;
+            if (expression == null) {
+                return next != null;
+            } else {
+                while (next != null) {
+                    MessageEvaluationContext context = new MessageEvaluationContext();
+                    context.setMessageReference((MessageReference) next);
+                    if (expression.matches(context)) {
+                        return true;
+                    }
+                    this.nextLocation = reader.getNextMessage(currentLocation);
+                    next = nextLocation != null ? nextLocation.getMessage()
+                            : null;
+                }
+                valid=false;
+                return false;
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to get next message from reader ", e);
+        }
+    }
+
+   
+    public Message next() {
+        if (valid && (nextLocation != null || hasNext())) {
+            this.currentLocation=nextLocation;
+            return nextLocation.getMessage();
+        }
+        return null;
+    }
+
+   
+    public void remove() {
+        throw new IllegalStateException("Not supported");
+        
+    }
+
+}

Added: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQReader.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQReader.java?rev=818209&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQReader.java (added)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/AMQReader.java Wed Sep 23 19:06:58 2009
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq.reader;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.Message;
+
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * Reads and iterates through data log files for the AMQMessage Store
+ * 
+ */
+public class AMQReader implements Iterable<Message> {
+
+    private AsyncDataManager dataManager;
+    private WireFormat wireFormat = new OpenWireFormat();
+    private File file;
+    private BooleanExpression expression;
+
+    /**
+     * List all the data files in a directory
+     * @param directory
+     * @return
+     * @throws IOException
+     */
+    public static Set<File> listDataFiles(File directory) throws IOException{
+        Set<File>result = new HashSet<File>();
+        if (directory == null || !directory.exists() || !directory.isDirectory()) {
+            throw new IOException("Invalid Directory " + directory);
+        }
+        AsyncDataManager dataManager = new AsyncDataManager();
+        dataManager.setDirectory(directory);
+        dataManager.start();
+        Set<File> set = dataManager.getFiles();
+        if (set != null) {
+            result.addAll(set);
+        }
+        dataManager.close();
+        return result;
+    }
+    /**
+     * Create the AMQReader to read a directory of amq data logs - or an
+     * individual data log file
+     * 
+     * @param file the directory - or file
+     * @throws IOException 
+     * @throws InvalidSelectorException 
+     * @throws IOException
+     * @throws InvalidSelectorException 
+     */
+    public AMQReader(File file) throws InvalidSelectorException, IOException {
+        this(file,null);
+    }
+    
+    /**
+     * Create the AMQReader to read a directory of amq data logs - or an
+     * individual data log file
+     * 
+     * @param file the directory - or file
+     * @param selector the JMS selector or null to select all
+     * @throws IOException
+     * @throws InvalidSelectorException 
+     */
+    public AMQReader(File file, String selector) throws IOException, InvalidSelectorException {
+        String str = selector != null ? selector.trim() : null;
+        if (str != null && str.length() > 0) {
+            this.expression=SelectorParser.parse(str);
+        }
+        dataManager = new AsyncDataManager();
+        dataManager.setArchiveDataLogs(false);
+        if (file.isDirectory()) {
+            dataManager.setDirectory(file);
+        } else {
+            dataManager.setDirectory(file.getParentFile());
+            dataManager.setDirectoryArchive(file);
+            this.file = file;
+        }
+        dataManager.start();
+    }
+
+    public Iterator<Message> iterator() {
+        return new AMQIterator(this,this.expression);
+    }
+
+    
+    protected MessageLocation getNextMessage(MessageLocation lastLocation)
+            throws IllegalStateException, IOException {
+        if (this.file != null) {
+            return getInternalNextMessage(this.file, lastLocation);
+        }
+        return getInternalNextMessage(lastLocation);
+    }
+
+    private MessageLocation getInternalNextMessage(MessageLocation lastLocation)
+            throws IllegalStateException, IOException {
+        return getInternalNextMessage(null, lastLocation);
+    }
+
+    private MessageLocation getInternalNextMessage(File file,
+            MessageLocation lastLocation) throws IllegalStateException,
+            IOException {
+        MessageLocation result = lastLocation;
+        if (result != null) {
+            result.setMessage(null);
+        }
+        Message message = null;
+        Location pos = lastLocation != null ? lastLocation.getLocation() : null;
+        while ((pos = getNextLocation(file, pos)) != null) {
+            message = getMessage(pos);
+            if (message != null) {
+                if (result == null) {
+                    result = new MessageLocation();
+                }
+                result.setMessage(message);
+                break;
+            }
+        }
+        result.setLocation(pos);
+        if (pos == null && message == null) {
+            result = null;
+        } else {
+            result.setLocation(pos);
+        }
+        return result;
+    }
+
+    private Location getNextLocation(File file, Location last)
+            throws IllegalStateException, IOException {
+        if (file != null) {
+            return dataManager.getNextLocation(file, last, true);
+        }
+        return dataManager.getNextLocation(last);
+    }
+
+    private Message getMessage(Location location) throws IOException {
+        ByteSequence data = dataManager.read(location);
+        DataStructure c = (DataStructure) wireFormat.unmarshal(data);
+        if (c instanceof Message) {
+            return (Message) c;
+        }
+        return null;
+
+    }
+}

Added: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/MessageLocation.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/MessageLocation.java?rev=818209&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/MessageLocation.java (added)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/store/amq/reader/MessageLocation.java Wed Sep 23 19:06:58 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq.reader;
+
+import javax.jms.Message;
+
+import org.apache.activemq.kaha.impl.async.Location;
+
+/**
+ * A holder for a message
+ *
+ */
+class MessageLocation {
+    private Message message;
+    private Location location;
+
+    
+    /**
+     * @return the location
+     */
+    public Location getLocation() {
+        return location;
+    }
+
+    /**
+     * @param location
+     */
+    public void setLocation(Location location) {
+        this.location = location;
+    }
+    /**
+     * @return the message
+     */
+    public Message getMessage() {
+        return message;
+    }
+
+    /**
+     * @param message
+     */
+    public void setMessage(Message message) {
+        this.message = message;
+    }
+
+    
+}

Added: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/DiscoveryRegistryServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/DiscoveryRegistryServlet.java?rev=818209&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/DiscoveryRegistryServlet.java (added)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/DiscoveryRegistryServlet.java Wed Sep 23 19:06:58 2009
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.discovery.http;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class DiscoveryRegistryServlet extends HttpServlet {
+    
+    private static final Log LOG = LogFactory.getLog(HTTPDiscoveryAgent.class);
+    long maxKeepAge = 1000*60*60; // 1 hour.
+    ConcurrentHashMap<String, ConcurrentHashMap<String, Long>> serviceGroups = new ConcurrentHashMap<String, ConcurrentHashMap<String, Long>>();
+    
+    @Override
+    protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+        String group = req.getPathInfo();
+        String service = req.getHeader("service");
+        LOG.debug("Registering: group="+group+", service="+service);
+        
+        ConcurrentHashMap<String, Long> services = getServiceGroup(group);
+        services.put(service, System.currentTimeMillis());
+    }
+
+    private ConcurrentHashMap<String, Long> getServiceGroup(String group) {
+        ConcurrentHashMap<String, Long> rc = serviceGroups.get(group);
+        if( rc == null ) {
+            rc = new ConcurrentHashMap<String, Long>();
+            serviceGroups.put(group, rc);
+        }
+        return rc;
+    }
+
+    @Override
+    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+        try {
+            long freshness = 1000*30;
+            String p = req.getParameter("freshness");
+            if( p!=null ) {
+                freshness = Long.parseLong(p);
+            }
+            
+            String group = req.getPathInfo();
+            LOG.debug("group="+group);
+            ConcurrentHashMap<String, Long> services = getServiceGroup(group);
+            PrintWriter writer = resp.getWriter();
+            
+            long now = System.currentTimeMillis();
+            long dropTime = now-maxKeepAge;             
+            long minimumTime = now-freshness;
+            
+            ArrayList<String> dropList = new ArrayList<String>();
+            for (Map.Entry<String, Long> entry : services.entrySet()) {
+                if( entry.getValue() > minimumTime ) {
+                    writer.println(entry.getKey());
+                } else if( entry.getValue() < dropTime ) {
+                    dropList.add(entry.getKey());
+                }
+            }
+            
+            // We might as well get rid of the really old entries.
+            for (String service : dropList) {
+                services.remove(service);
+            }
+            
+            
+        } catch (Exception e) {
+            resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Error occured: "+e);
+        }
+    }
+    
+    @Override
+    protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+        String group = req.getPathInfo();
+        String service = req.getHeader("service");
+        LOG.debug("Unregistering: group="+group+", service="+service);
+        
+        ConcurrentHashMap<String, Long> services = getServiceGroup(group);
+        services.remove(service);
+    }
+        
+}

Added: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java?rev=818209&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java (added)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java Wed Sep 23 19:06:58 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.discovery.http;
+
+import java.net.URI;
+
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+
+public class EmbeddedJettyServer implements org.apache.activemq.Service {
+
+    private HTTPDiscoveryAgent agent;
+    private Server server;
+    private SelectChannelConnector connector;
+    private DiscoveryRegistryServlet camelServlet = new DiscoveryRegistryServlet();
+    
+    public void start() throws Exception {
+        URI uri = new URI(agent.getRegistryURL());
+
+        server = new Server();
+        Context context = new Context(Context.NO_SECURITY | Context.NO_SESSIONS);
+        
+        context.setContextPath("/");
+        ServletHolder holder = new ServletHolder();
+        holder.setServlet(camelServlet);
+        context.addServlet(holder, "/*");
+        server.setHandler(context);
+        server.start();
+        
+        int port = 80;
+        if( uri.getPort() >=0 ) {
+            port = uri.getPort();
+        }
+        
+        connector = new SelectChannelConnector();
+        connector.setPort(port);
+        server.addConnector(connector);
+        connector.start();
+    }
+
+    public void stop() throws Exception {
+        if( connector!=null ) {
+            connector.stop();
+            connector = null;
+        }
+        if( server!=null ) {
+            server.stop();
+            server = null;
+        }
+    }
+
+    public HTTPDiscoveryAgent getAgent() {
+        return agent;
+    }
+
+    public void setAgent(HTTPDiscoveryAgent agent) {
+        this.agent = agent;
+    }
+    
+
+}

Added: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java?rev=818209&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java (added)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java Wed Sep 23 19:06:58 2009
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.discovery.http;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.command.DiscoveryEvent;
+import org.apache.activemq.transport.discovery.DiscoveryAgent;
+import org.apache.activemq.transport.discovery.DiscoveryListener;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.DeleteMethod;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PutMethod;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class HTTPDiscoveryAgent implements DiscoveryAgent {
+    
+    private static final Log LOG = LogFactory.getLog(HTTPDiscoveryAgent.class);
+    
+    private String registryURL = "http://localhost:8080/discovery-registry/default";
+    private HttpClient httpClient = new HttpClient();
+    private AtomicBoolean running=new AtomicBoolean();
+    private final AtomicReference<DiscoveryListener> discoveryListener = new AtomicReference<DiscoveryListener>();
+    private final HashSet<String> registeredServices = new HashSet<String>();
+    private final HashMap<String, SimpleDiscoveryEvent> discoveredServices = new HashMap<String, SimpleDiscoveryEvent>();    
+    private Thread thread;   
+    private long updateInterval = 1000*10;
+    private String brokerName;
+    private boolean startEmbeddRegistry=false;
+    private Service jetty;
+    private AtomicInteger startCounter=new AtomicInteger(0);
+
+    
+    private long initialReconnectDelay = 1000;
+    private long maxReconnectDelay = 1000 * 30;
+    private long backOffMultiplier = 2;
+    private boolean useExponentialBackOff=true;    
+    private int maxReconnectAttempts;
+    private final Object sleepMutex = new Object();
+    private long minConnectTime = 5000;
+    
+    class SimpleDiscoveryEvent extends DiscoveryEvent {
+
+        private int connectFailures;
+        private long reconnectDelay = initialReconnectDelay;
+        private long connectTime = System.currentTimeMillis();
+        private AtomicBoolean failed = new AtomicBoolean(false);
+        private AtomicBoolean removed = new AtomicBoolean(false);
+
+        public SimpleDiscoveryEvent(String service) {
+            super(service);
+        }
+
+    }
+
+    
+    public String getGroup() {
+        return null;
+    }
+
+    public void registerService(String service) throws IOException {
+        synchronized(registeredServices) {
+            registeredServices.add(service);
+        }
+        doRegister(service);
+    }
+
+    synchronized private void doRegister(String service) {
+        String url = registryURL;
+        try {
+            PutMethod method = new PutMethod(url);
+//            method.setParams(createParams());
+            method.setRequestHeader("service", service);
+            int responseCode = httpClient.executeMethod(method);
+            LOG.debug("PUT to "+url+" got a "+responseCode);
+        } catch (Exception e) {
+            LOG.debug("PUT to "+url+" failed with: "+e);
+        }
+    }
+    
+    synchronized private void doUnRegister(String service) {
+        String url = registryURL;
+        try {
+            DeleteMethod method = new DeleteMethod(url);
+//            method.setParams(createParams());
+            method.setRequestHeader("service", service);
+            int responseCode = httpClient.executeMethod(method);
+            LOG.debug("DELETE to "+url+" got a "+responseCode);
+        } catch (Exception e) {
+            LOG.debug("DELETE to "+url+" failed with: "+e);
+        }
+    }
+
+//    private HttpMethodParams createParams() {
+//        HttpMethodParams params = new HttpMethodParams();
+//        params.setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(0,false));
+//        return params;
+//    }
+    
+    synchronized private Set<String> doLookup(long freshness) {
+        String url = registryURL+"?freshness="+freshness;
+        try {
+            GetMethod method = new GetMethod(url);
+//            method.setParams(createParams());
+            int responseCode = httpClient.executeMethod(method);
+            LOG.debug("GET to "+url+" got a "+responseCode);
+            if( responseCode == 200 ) {
+                Set<String> rc = new HashSet<String>();
+                Scanner scanner = new Scanner(method.getResponseBodyAsStream());
+                while( scanner.hasNextLine() ) {
+                    String service = scanner.nextLine();
+                    if( service.trim().length() != 0 ) {
+                        rc.add(service);
+                    }
+                }
+                return rc;
+            } else {
+                LOG.debug("GET to "+url+" failed with response code: "+responseCode);
+                return null;
+            }
+        } catch (Exception e) {
+            LOG.debug("GET to "+url+" failed with: "+e);
+            return null;
+        }
+    }
+
+    public void serviceFailed(DiscoveryEvent devent) throws IOException {
+
+        final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
+        if (event.failed.compareAndSet(false, true)) {
+        	discoveryListener.get().onServiceRemove(event);
+        	if(!event.removed.get()) {
+	        	// Setup a thread to re-raise the event...
+	            Thread thread = new Thread() {
+	                public void run() {
+	
+	                    // We detect a failed connection attempt because the service
+	                    // fails right away.
+	                    if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
+	                        LOG.debug("Failure occured soon after the discovery event was generated.  It will be clasified as a connection failure: "+event);
+	
+	                        event.connectFailures++;
+	
+	                        if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
+	                            LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+" tries.  Reconnecting has been disabled.");
+	                            return;
+	                        }
+	
+	                        synchronized (sleepMutex) {
+	                            try {
+	                                if (!running.get() || event.removed.get()) {
+	                                    return;
+	                                }
+	                                LOG.debug("Waiting "+event.reconnectDelay+" ms before attepting to reconnect.");
+	                                sleepMutex.wait(event.reconnectDelay);
+	                            } catch (InterruptedException ie) {
+	                                Thread.currentThread().interrupt();
+	                                return;
+	                            }
+	                        }
+	
+	                        if (!useExponentialBackOff) {
+	                            event.reconnectDelay = initialReconnectDelay;
+	                        } else {
+	                            // Exponential increment of reconnect delay.
+	                            event.reconnectDelay *= backOffMultiplier;
+	                            if (event.reconnectDelay > maxReconnectDelay) {
+	                                event.reconnectDelay = maxReconnectDelay;
+	                            }
+	                        }
+	
+	                    } else {
+	                        event.connectFailures = 0;
+	                        event.reconnectDelay = initialReconnectDelay;
+	                    }
+	
+	                    if (!running.get() || event.removed.get()) {
+	                        return;
+	                    }
+	
+	                    event.connectTime = System.currentTimeMillis();
+	                    event.failed.set(false);
+	                    discoveryListener.get().onServiceAdd(event);
+	                }
+	            };
+	            thread.setDaemon(true);
+	            thread.start();
+        	}
+        }
+    }
+
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public void setDiscoveryListener(DiscoveryListener discoveryListener) {
+        this.discoveryListener.set(discoveryListener);
+    }
+
+    public void setGroup(String group) {
+    }
+
+    public void start() throws Exception {
+        if( startCounter.addAndGet(1)==1 ) {
+            if( startEmbeddRegistry ) {
+                jetty = createEmbeddedJettyServer();
+                Map props = new HashMap();
+                props.put("agent", this);
+                IntrospectionSupport.setProperties(jetty, props);
+                jetty.start();
+            }
+            
+            running.set(true);
+            thread = new Thread("HTTPDiscovery Agent") {
+                @Override
+                public void run() {
+                    while(running.get()) {
+                        try {
+                            update();
+                            Thread.sleep(updateInterval);
+                        } catch (InterruptedException e) {
+                            return;
+                        }
+                    }
+                }
+            };
+            thread.setDaemon(true);
+            thread.start();
+        }
+    }
+
+    /**
+     * Create the EmbeddedJettyServer instance via reflection so that we can avoid a hard runtime dependency on 
+     * jetty.
+     * 
+     * @return
+     * @throws Exception
+     */
+    private Service createEmbeddedJettyServer()  throws Exception {
+        Class clazz = HTTPDiscoveryAgent.class.getClassLoader().loadClass("org.apache.activemq.transport.discovery.http.EmbeddedJettyServer");
+        return (Service)clazz.newInstance();
+    }
+
+    private void update() {
+        // Register all our services...
+        synchronized(registeredServices) {
+            for (String service : registeredServices) {
+                doRegister(service);
+            }
+        }
+        
+        // Find new registered services...
+        DiscoveryListener discoveryListener = this.discoveryListener.get();
+        if(discoveryListener!=null) {
+            Set<String> activeServices = doLookup(updateInterval*3);
+            // If there is error talking the the central server, then activeServices == null
+            if( activeServices !=null ) {
+                synchronized(discoveredServices) {
+                    
+                    HashSet<String> removedServices = new HashSet<String>(discoveredServices.keySet());
+                    removedServices.removeAll(activeServices);
+                    
+                    HashSet<String> addedServices = new HashSet<String>(activeServices);
+                    addedServices.removeAll(discoveredServices.keySet());
+                    addedServices.removeAll(removedServices);
+                    
+                    for (String service : addedServices) {
+                        SimpleDiscoveryEvent e = new SimpleDiscoveryEvent(service);
+                        discoveredServices.put(service, e);
+                        discoveryListener.onServiceAdd(e);
+                    }
+                    
+                    for (String service : removedServices) {
+                    	SimpleDiscoveryEvent e = discoveredServices.remove(service);
+                    	if( e !=null ) {
+                    		e.removed.set(true);
+                    	}
+                        discoveryListener.onServiceRemove(e);
+                    }
+                }
+            }
+        }
+    }
+
+    public void stop() throws Exception {
+        if( startCounter.decrementAndGet()==0 ) {
+            running.set(false);
+            if( thread!=null ) {
+                thread.join(updateInterval*3);
+                thread=null;
+            }
+            if( jetty!=null ) {
+                jetty.stop();
+                jetty = null;
+            }
+        }
+    }
+
+    public String getRegistryURL() {
+        return registryURL;
+    }
+
+    public void setRegistryURL(String discoveryRegistryURL) {
+        this.registryURL = discoveryRegistryURL;
+    }
+
+    public long getUpdateInterval() {
+        return updateInterval;
+    }
+
+    public void setUpdateInterval(long updateInterval) {
+        this.updateInterval = updateInterval;
+    }
+
+    public boolean isStartEmbeddRegistry() {
+        return startEmbeddRegistry;
+    }
+
+    public void setStartEmbeddRegistry(boolean startEmbeddRegistry) {
+        this.startEmbeddRegistry = startEmbeddRegistry;
+    }
+
+}

Added: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgentFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgentFactory.java?rev=818209&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgentFactory.java (added)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgentFactory.java Wed Sep 23 19:06:58 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.discovery.http;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.activemq.transport.discovery.DiscoveryAgent;
+import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+
+public class HTTPDiscoveryAgentFactory extends DiscoveryAgentFactory {
+
+    protected DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws IOException {
+        try {
+            
+            Map options = URISupport.parseParamters(uri);
+            uri = URISupport.removeQuery(uri);
+            
+            HTTPDiscoveryAgent rc = new HTTPDiscoveryAgent();
+            rc.setRegistryURL(uri.toString());
+            
+            IntrospectionSupport.setProperties(rc, options);
+            
+            return rc;
+            
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create("Could not create discovery agent: " + uri, e);
+        }
+    }
+}

Propchange: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgentFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-optional/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/http
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/http?rev=818209&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/http (added)
+++ activemq/trunk/activemq-optional/src/main/resources/META-INF/services/org/apache/activemq/transport/discoveryagent/http Wed Sep 23 19:06:58 2009
@@ -0,0 +1,4 @@
+## ---------------------------------------------------------------------------
+## 
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.discovery.http.HTTPDiscoveryAgentFactory

Added: activemq/trunk/activemq-optional/src/main/resources/org/apache/activemq/store/amq/help.txt
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/resources/org/apache/activemq/store/amq/help.txt?rev=818209&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/main/resources/org/apache/activemq/store/amq/help.txt (added)
+++ activemq/trunk/activemq-optional/src/main/resources/org/apache/activemq/store/amq/help.txt Wed Sep 23 19:06:58 2009
@@ -0,0 +1,52 @@
+Usage:
+  java org.apache.activemq.store.amq.AMQJournalTool [options]* (directory) *
+  
+Displays the records stored in the Journal log files used by ActiveMQ.  This
+tool supports loading the journal data files from multiple directories.  Normally
+it is run against the journal archive directory and the active journal directory.
+
+This tool supports controlling the output format using Velocity [1] templates.  
+It also supports filtering out records using a SQL like WHERE syntax implemented
+using JoSQL.
+  
+Options to control output format:
+  
+Any valid Velocity Template Language (VTL) expression can be used to control the 
+display of the record.  
+  
+  --message-format=VTL          The format used to display message records. Message
+      records get created every time a producer sends a persistent message to the broker.
+      The message gets recorded in the journal even if it's transaction is rolled back.
+      Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageId}|${record.properties}|${body}
+      
+  --topic-ack-format=VTL        The format used to display topic ack records.  A topic
+      ack records that a durable subscription for a topic has acknowleged a set of messages.
+      Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.clientId}|${record.subscritionName}|${record.messageId}
+      
+  --queue-ack-format=VTL        The format used to display queue ack records. A queue
+      ack records that a consumer for a quue has acknowleged a message.
+      Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.destination}|${record.messageAck.lastMessageId}
+      
+  --transaction-format=VTL      The format used to display transaction records. Transaction records
+      are used to record transaction related actions like commit and rollback.
+      Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.transactionId}
+      
+  --trace-format=VTL            The format used to display trace records.
+      Trace records are informational messages stored in the journal that assist in Auditing.  
+      For example a trace message is recorded whenever the broker is restarted or when the 
+      long term store is checkpointed.  
+      Default VTL: ${location.dataFileId},${location.offset}|${type}|${record.message}
+  
+Options to control the selection of records displayed:  
+  --where=VALUE                 The where clause used to control the records selected
+      for display.  It can select on all the fields available in the velocity context.
+      example:  --where="type='ActiveMQTextMessage' and location.dataFileId > 2"
+      
+Other Options:
+  --help                        Show this help screen.
+
+Example:
+
+  java org.apache.activemq.store.amq.AMQJournalTool /path/to/archive /path/to/journal
+  
+ 
\ No newline at end of file

Added: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/store/amq/reader/AMQReaderTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/store/amq/reader/AMQReaderTest.java?rev=818209&view=auto
==============================================================================
--- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/store/amq/reader/AMQReaderTest.java (added)
+++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/store/amq/reader/AMQReaderTest.java Wed Sep 23 19:06:58 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.amq.reader;
+
+import java.io.File;
+import java.util.Set;
+
+import javax.jms.Message;
+import junit.framework.TestCase;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+
+
+public class AMQReaderTest extends TestCase {
+
+    public void testIterateArchive() throws Exception{
+        String resourceName = getClass().getPackage().getName() + File.separator + "data";
+        resourceName = resourceName.replace('.', File.separatorChar);
+        Resource resource = new ClassPathResource(resourceName);
+        AMQReader reader = new AMQReader(resource.getFile());
+        for (Message m:reader) {
+            assertNotNull(m);
+        }            
+    }
+    
+    public void xtestIterateFile() throws Exception{
+        String resourceName = getClass().getPackage().getName() + File.separator + "data";
+        resourceName = resourceName.replace('.', File.separatorChar);
+        Resource resource = new ClassPathResource(resourceName);
+        Set<File> files = AMQReader.listDataFiles(resource.getFile());
+        assertNotNull(files);
+        assertTrue(files.size() >0);
+        for (File file: files) {
+            System.err.println("READING " + file);
+            AMQReader reader = new AMQReader(file);
+            for (Message m:reader) {
+                assertNotNull(m);
+            } 
+        }         
+    }
+
+}

Added: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-1
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-1?rev=818209&view=auto
==============================================================================
Binary file - no diff available.

Propchange: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-1
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-2
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-2?rev=818209&view=auto
==============================================================================
Binary file - no diff available.

Propchange: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-2
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-3
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-3?rev=818209&view=auto
==============================================================================
Binary file - no diff available.

Propchange: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-3
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-4
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-4?rev=818209&view=auto
==============================================================================
Binary file - no diff available.

Propchange: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-4
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-5
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-5?rev=818209&view=auto
==============================================================================
Binary file - no diff available.

Propchange: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-5
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-6
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-6?rev=818209&view=auto
==============================================================================
Binary file - no diff available.

Propchange: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-6
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-7
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-7?rev=818209&view=auto
==============================================================================
Binary file - no diff available.

Propchange: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-7
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-8
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-8?rev=818209&view=auto
==============================================================================
Binary file - no diff available.

Propchange: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-8
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-9
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-9?rev=818209&view=auto
==============================================================================
Binary file - no diff available.

Propchange: activemq/trunk/activemq-optional/src/test/resources/org/apache/activemq/store/amq/reader/data/data-9
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream



Mime
View raw message