activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r433948 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/util/MulticastTraceBrokerPlugin.java broker/util/UDPTraceBrokerPlugin.java command/JournalTrace.java
Date Wed, 23 Aug 2006 06:58:41 GMT
Author: chirino
Date: Tue Aug 22 23:58:41 2006
New Revision: 433948

URL: http://svn.apache.org/viewvc?rev=433948&view=rev
Log:
Added a udp and multicast broker tracing plugins.  If enabled on a broker they can be used
to monitor the broker without much network overhead.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/MulticastTraceBrokerPlugin.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java
  (with props)

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/MulticastTraceBrokerPlugin.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/MulticastTraceBrokerPlugin.java?rev=433948&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/MulticastTraceBrokerPlugin.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/MulticastTraceBrokerPlugin.java
Tue Aug 22 23:58:41 2006
@@ -0,0 +1,61 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.MulticastSocket;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * A Broker interceptor which allows you to trace all operations to a Multicast socket.
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision: 427613 $
+ */
+public class MulticastTraceBrokerPlugin extends UDPTraceBrokerPlugin {
+
+	private int timeToLive = 1;
+	
+	public MulticastTraceBrokerPlugin() {
+		try {
+			destination = new URI("multicast://224.1.2.3:61616");
+		} catch (URISyntaxException wontHappen) {
+		}
+	}
+	
+	protected DatagramSocket createSocket() throws IOException {
+        MulticastSocket s = new MulticastSocket();
+		s.setSendBufferSize(maxTraceDatagramSize);
+		s.setBroadcast(broadcast);
+        s.setLoopbackMode(true);
+        s.setTimeToLive(timeToLive);
+        return s;
+	}
+
+	public int getTimeToLive() {
+		return timeToLive;
+	}
+
+	public void setTimeToLive(int timeToLive) {
+		this.timeToLive = timeToLive;
+	}
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java?rev=433948&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java
Tue Aug 22 23:58:41 2006
@@ -0,0 +1,192 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import org.apache.activeio.command.WireFormat;
+import org.apache.activeio.command.WireFormatFactory;
+import org.apache.activeio.packet.ByteSequence;
+import org.apache.activeio.util.ByteArrayOutputStream;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.JournalTrace;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+
+/**
+ * A Broker interceptor which allows you to trace all operations to a UDP socket.
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision: 427613 $
+ */
+public class UDPTraceBrokerPlugin extends BrokerPluginSupport {
+
+	protected WireFormat wireFormat;
+	protected WireFormatFactory wireFormatFactory;
+	protected int maxTraceDatagramSize = 1024*4;
+	protected URI destination;
+	protected DatagramSocket socket;
+		
+	protected BrokerId brokerId;
+	protected SocketAddress address;
+	protected boolean broadcast;
+	
+	public UDPTraceBrokerPlugin() {
+		try {
+			destination = new URI("udp://127.0.0.1:61616");
+		} catch (URISyntaxException wontHappen) {
+		}
+	}
+
+	public void start() throws Exception {
+		super.start();
+		if( getWireFormat() == null )
+			throw new IllegalArgumentException("Wireformat must be specifed.");	
+		if( address == null ) {
+			address = createSocketAddress(destination);
+		}
+		socket = createSocket();
+		
+		brokerId = super.getBrokerId();
+		trace(new JournalTrace("START"));		
+	}	
+
+	protected DatagramSocket createSocket() throws IOException {
+		DatagramSocket s = new DatagramSocket();
+		s.setSendBufferSize(maxTraceDatagramSize);
+		s.setBroadcast(broadcast);
+		return s;
+	}
+
+	public void stop() throws Exception {
+		trace(new JournalTrace("STOP"));
+		socket.close();
+		super.stop();
+	}
+	
+	private void trace(DataStructure command) throws IOException {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream(maxTraceDatagramSize);
+		DataOutputStream out = new DataOutputStream(baos);
+		wireFormat.marshal(brokerId, out);
+		wireFormat.marshal(command, out);
+		out.close();
+		ByteSequence sequence = baos.toByteSequence();
+		DatagramPacket datagram = new DatagramPacket( sequence.getData(), sequence.getOffset(),
sequence.getLength(), address);		
+		socket.send(datagram);
+	}
+	
+    public void send(ConnectionContext context, Message messageSend) throws Exception {
+    	trace(messageSend);
+        super.send(context, messageSend);
+    }
+
+    public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+    	trace(ack);
+        super.acknowledge(context, ack);
+    }
+
+	public WireFormat getWireFormat() {
+		if( wireFormat == null ) {
+			wireFormat = createWireFormat();
+		}
+		return wireFormat;
+	}
+
+	protected WireFormat createWireFormat() {
+		return getWireFormatFactory().createWireFormat();
+	}
+
+	public void setWireFormat(WireFormat wireFormat) {
+		this.wireFormat = wireFormat;
+	}
+
+	public WireFormatFactory getWireFormatFactory() {
+		if( wireFormatFactory == null ) {
+			wireFormatFactory = createWireFormatFactory();
+		}
+		return wireFormatFactory;
+	}
+
+	protected OpenWireFormatFactory createWireFormatFactory() {
+		OpenWireFormatFactory wf = new OpenWireFormatFactory();
+		wf.setCacheEnabled(false);
+		wf.setVersion(1);
+		wf.setTightEncodingEnabled(true);
+		wf.setSizePrefixDisabled(true);
+		return wf;
+	}
+
+	public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
+		this.wireFormatFactory = wireFormatFactory;
+	}
+
+
+	protected SocketAddress createSocketAddress(URI location) throws UnknownHostException {
+		InetAddress a = InetAddress.getByName(location.getHost());
+		int port = location.getPort();
+		return new InetSocketAddress(a, port);
+	}
+
+	public URI getDestination() {
+		return destination;
+	}
+
+	public void setDestination(URI destination) {
+		this.destination = destination;
+	}
+
+	public int getMaxTraceDatagramSize() {
+		return maxTraceDatagramSize;
+	}
+
+	public void setMaxTraceDatagramSize(int maxTraceDatagramSize) {
+		this.maxTraceDatagramSize = maxTraceDatagramSize;
+	}
+
+	public boolean isBroadcast() {
+		return broadcast;
+	}
+
+	public void setBroadcast(boolean broadcast) {
+		this.broadcast = broadcast;
+	}
+
+	public SocketAddress getAddress() {
+		return address;
+	}
+
+	public void setAddress(SocketAddress address) {
+		this.address = address;
+	}
+
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java?rev=433948&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java
Tue Aug 22 23:58:41 2006
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.util.IntrospectionSupport;
+
+/**
+ * 
+ * @openwire:marshaller code="53"
+ * @version $Revision: 1.6 $
+ */
+public class JournalTrace implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE=CommandTypes.JOURNAL_TRACE;
+
+    private String message;
+    
+    public JournalTrace() {
+    	
+    }
+    public JournalTrace(String message) {
+    	this.message = message;
+    }
+    
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+    
+    /**
+     * @openwire:property version=1
+     */
+    public String getMessage() {
+        return message;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public void setMessage(String message) {
+        this.message = message;
+    }    
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    public String toString() {
+        return IntrospectionSupport.toString(this, JournalTrace.class);
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/JournalTrace.java
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message