Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Aug 8 11:56:59 2007
@@ -97,7 +97,8 @@
public class TransportConnection implements Service, Connection, Task, CommandVisitor {
private static final Log LOG = LogFactory.getLog(TransportConnection.class);
- private static final Log TRANSPORTLOG = LogFactory.getLog(TransportConnection.class.getName() + ".Transport");
+ private static final Log TRANSPORTLOG = LogFactory.getLog(TransportConnection.class.getName()
+ + ".Transport");
private static final Log SERVICELOG = LogFactory.getLog(TransportConnection.class.getName() + ".Service");
// Keeps track of the broker and connector that created this connection.
protected final Broker broker;
@@ -190,7 +191,8 @@
* @param taskRunnerFactory - can be null if you want direct dispatch to the
* transport else commands are sent async.
*/
- public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, TaskRunnerFactory taskRunnerFactory) {
+ public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
+ TaskRunnerFactory taskRunnerFactory) {
this.connector = connector;
this.broker = broker;
RegionBroker rb = (RegionBroker)broker.getAdaptor(RegionBroker.class);
@@ -270,7 +272,8 @@
else if (e.getClass() == BrokerStoppedException.class) {
if (!disposed.get()) {
if (SERVICELOG.isDebugEnabled())
- SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection.");
+ SERVICELOG
+ .debug("Broker has been stopped. Notifying client and closing his connection.");
ConnectionError ce = new ConnectionError();
ce.setException(e);
dispatchSync(ce);
@@ -400,7 +403,8 @@
}
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if (transactionState == null)
- throw new IllegalStateException("Cannot prepare a transaction that had not been started: " + info.getTransactionId());
+ throw new IllegalStateException("Cannot prepare a transaction that had not been started: "
+ + info.getTransactionId());
// Avoid dups.
if (!transactionState.isPrepared()) {
transactionState.setPrepared(true);
@@ -469,7 +473,8 @@
return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
}
- public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
+ public Response processMessageDispatchNotification(MessageDispatchNotification notification)
+ throws Exception {
broker.processDispatchNotification(notification);
return null;
}
@@ -498,7 +503,9 @@
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
if (ss == null)
- throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
+ throw new IllegalStateException(
+ "Cannot add a producer to a session that had not been registered: "
+ + sessionId);
// Avoid replaying dup commands
if (!ss.getProducerIds().contains(info.getProducerId())) {
broker.addProducer(cs.getContext(), info);
@@ -517,7 +524,9 @@
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
if (ss == null)
- throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " + sessionId);
+ throw new IllegalStateException(
+ "Cannot remove a producer from a session that had not been registered: "
+ + sessionId);
ProducerState ps = ss.removeProducer(id);
if (ps == null)
throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
@@ -532,7 +541,9 @@
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
if (ss == null)
- throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: " + sessionId);
+ throw new IllegalStateException(
+ "Cannot add a consumer to a session that had not been registered: "
+ + sessionId);
// Avoid replaying dup commands
if (!ss.getConsumerIds().contains(info.getConsumerId())) {
broker.addConsumer(cs.getContext(), info);
@@ -551,7 +562,9 @@
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
if (ss == null)
- throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " + sessionId);
+ throw new IllegalStateException(
+ "Cannot remove a consumer from a session that had not been registered: "
+ + sessionId);
ConsumerState consumerState = ss.removeConsumer(id);
if (consumerState == null)
throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
@@ -628,7 +641,8 @@
if (state.getConnection() != this) {
LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
state.getConnection().stop();
- LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: " + state.getConnection().getRemoteAddress());
+ LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: "
+ + state.getConnection().getRemoteAddress());
state.setConnection(this);
state.reset(info);
}
@@ -751,7 +765,8 @@
}
protected void processDispatch(Command command) throws IOException {
- final MessageDispatch messageDispatch = (MessageDispatch)(command.isMessageDispatch() ? command : null);
+ final MessageDispatch messageDispatch = (MessageDispatch)(command.isMessageDispatch()
+ ? command : null);
try {
if (!disposed.get()) {
if (messageDispatch != null) {
@@ -831,7 +846,8 @@
transport.start();
if (taskRunnerFactory != null) {
- taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " + getRemoteAddress());
+ taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
+ + getRemoteAddress());
} else {
taskRunner = null;
}
@@ -1098,7 +1114,8 @@
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
Transport localTransport = TransportFactory.connect(uri);
Transport remoteBridgeTransport = new ResponseCorrelator(transport);
- duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport);
+ duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport,
+ remoteBridgeTransport);
// now turn duplex off this side
info.setDuplexConnection(false);
duplexBridge.setCreatedByDuplex(true);
@@ -1163,7 +1180,8 @@
ProducerState producerState = ss.getProducerState(id);
if (producerState != null && producerState.getInfo() != null) {
ProducerInfo info = producerState.getInfo();
- result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
+ result.setMutable(info.getDestination() == null
+ || info.getDestination().isComposite());
}
}
producerExchanges.put(id, result);
@@ -1267,7 +1285,8 @@
//
// /////////////////////////////////////////////////////////////////
- protected TransportConnectionState registerConnectionState(ConnectionId connectionId, TransportConnectionState state) {
+ protected TransportConnectionState registerConnectionState(ConnectionId connectionId,
+ TransportConnectionState state) {
TransportConnectionState rc = connectionState;
connectionState = state;
return rc;
@@ -1290,35 +1309,44 @@
protected TransportConnectionState lookupConnectionState(String connectionId) {
TransportConnectionState cs = connectionState;
if (cs == null)
- throw new IllegalStateException("Cannot lookup a connectionId for a connection that had not been registered: " + connectionId);
+ throw new IllegalStateException(
+ "Cannot lookup a connectionId for a connection that had not been registered: "
+ + connectionId);
return cs;
}
protected TransportConnectionState lookupConnectionState(ConsumerId id) {
TransportConnectionState cs = connectionState;
if (cs == null)
- throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: " + id.getParentId().getParentId());
+ throw new IllegalStateException(
+ "Cannot lookup a consumer from a connection that had not been registered: "
+ + id.getParentId().getParentId());
return cs;
}
protected TransportConnectionState lookupConnectionState(ProducerId id) {
TransportConnectionState cs = connectionState;
if (cs == null)
- throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: " + id.getParentId().getParentId());
+ throw new IllegalStateException(
+ "Cannot lookup a producer from a connection that had not been registered: "
+ + id.getParentId().getParentId());
return cs;
}
protected TransportConnectionState lookupConnectionState(SessionId id) {
TransportConnectionState cs = connectionState;
if (cs == null)
- throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: " + id.getParentId());
+ throw new IllegalStateException(
+ "Cannot lookup a session from a connection that had not been registered: "
+ + id.getParentId());
return cs;
}
protected TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
TransportConnectionState cs = connectionState;
if (cs == null)
- throw new IllegalStateException("Cannot lookup a connection that had not been registered: " + connectionId);
+ throw new IllegalStateException("Cannot lookup a connection that had not been registered: "
+ + connectionId);
return cs;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Wed Aug 8 11:56:59 2007
@@ -31,16 +31,16 @@
import org.apache.activemq.command.RemoveSubscriptionInfo;
public class BrokerView implements BrokerViewMBean {
-
+
final ManagedRegionBroker broker;
- private final BrokerService brokerService;
+ private final BrokerService brokerService;
private final AtomicInteger sessionIdCounter = new AtomicInteger(0);
public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception {
this.brokerService = brokerService;
- this.broker = managedBroker;
+ this.broker = managedBroker;
}
-
+
public ManagedRegionBroker getBroker() {
return broker;
}
@@ -48,31 +48,35 @@
public String getBrokerId() {
return broker.getBrokerId().toString();
}
-
+
public void gc() throws Exception {
- brokerService.getBroker().gc();
+ brokerService.getBroker().gc();
}
public void start() throws Exception {
- brokerService.start();
+ brokerService.start();
}
-
+
public void stop() throws Exception {
- brokerService.stop();
+ brokerService.stop();
}
-
+
public long getTotalEnqueueCount() {
- return broker.getDestinationStatistics().getEnqueues().getCount();
+ return broker.getDestinationStatistics().getEnqueues().getCount();
}
+
public long getTotalDequeueCount() {
return broker.getDestinationStatistics().getDequeues().getCount();
}
+
public long getTotalConsumerCount() {
return broker.getDestinationStatistics().getConsumers().getCount();
}
+
public long getTotalMessageCount() {
return broker.getDestinationStatistics().getMessages().getCount();
- }
+ }
+
public long getTotalMessagesCached() {
return broker.getDestinationStatistics().getMessagesCached().getCount();
}
@@ -80,71 +84,72 @@
public int getMemoryPercentageUsed() {
return brokerService.getMemoryManager().getPercentUsage();
}
+
public long getMemoryLimit() {
return brokerService.getMemoryManager().getLimit();
}
+
public void setMemoryLimit(long limit) {
- brokerService.getMemoryManager().setLimit(limit);
+ brokerService.getMemoryManager().setLimit(limit);
}
-
+
public void resetStatistics() {
broker.getDestinationStatistics().reset();
}
-
+
public void enableStatistics() {
broker.getDestinationStatistics().setEnabled(true);
- }
-
+ }
+
public void disableStatistics() {
broker.getDestinationStatistics().setEnabled(false);
- }
-
+ }
+
public boolean isStatisticsEnabled() {
- return broker.getDestinationStatistics().isEnabled();
+ return broker.getDestinationStatistics().isEnabled();
}
-
public void terminateJVM(int exitCode) {
System.exit(exitCode);
}
- public ObjectName[] getTopics(){
+ public ObjectName[] getTopics() {
return broker.getTopics();
}
- public ObjectName[] getQueues(){
+ public ObjectName[] getQueues() {
return broker.getQueues();
}
- public ObjectName[] getTemporaryTopics(){
+ public ObjectName[] getTemporaryTopics() {
return broker.getTemporaryTopics();
}
- public ObjectName[] getTemporaryQueues(){
+ public ObjectName[] getTemporaryQueues() {
return broker.getTemporaryQueues();
}
- public ObjectName[] getTopicSubscribers(){
- return broker.getTemporaryTopicSubscribers();
+ public ObjectName[] getTopicSubscribers() {
+ return broker.getTemporaryTopicSubscribers();
}
- public ObjectName[] getDurableTopicSubscribers(){
+ public ObjectName[] getDurableTopicSubscribers() {
return broker.getDurableTopicSubscribers();
}
- public ObjectName[] getQueueSubscribers(){
- return broker.getQueueSubscribers();
+ public ObjectName[] getQueueSubscribers() {
+ return broker.getQueueSubscribers();
}
- public ObjectName[] getTemporaryTopicSubscribers(){
+ public ObjectName[] getTemporaryTopicSubscribers() {
return broker.getTemporaryTopicSubscribers();
}
- public ObjectName[] getTemporaryQueueSubscribers(){
+ public ObjectName[] getTemporaryQueueSubscribers() {
return broker.getTemporaryQueueSubscribers();
}
-
- public ObjectName[] getInactiveDurableTopicSubscribers(){
+
+ public ObjectName[] getInactiveDurableTopicSubscribers() {
return broker.getInactiveDurableTopicSubscribers();
}
@@ -157,14 +162,17 @@
}
public void removeTopic(String name) throws Exception {
- broker.removeDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQTopic(name), 1000);
+ broker.removeDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQTopic(name),
+ 1000);
}
public void removeQueue(String name) throws Exception {
- broker.removeDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name), 1000);
+ broker.removeDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name),
+ 1000);
}
-
- public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName, String selector) throws Exception {
+
+ public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName,
+ String selector) throws Exception {
ConnectionContext context = new ConnectionContext();
context.setBroker(broker);
context.setClientId(clientId);
@@ -194,11 +202,10 @@
context.setClientId(clientId);
broker.removeSubscription(context, info);
}
-
-
+
/**
- * Returns the broker's administration connection context used for configuring the broker
- * at startup
+ * Returns the broker's administration connection context used for
+ * configuring the broker at startup
*/
public static ConnectionContext getConnectionContext(Broker broker) {
ConnectionContext adminConnectionContext = broker.getAdminConnectionContext();
@@ -208,11 +215,12 @@
}
return adminConnectionContext;
}
-
+
/**
- * Factory method to create the new administration connection context object.
- * Note this method is here rather than inside a default broker implementation to
- * ensure that the broker reference inside it is the outer most interceptor
+ * Factory method to create the new administration connection context
+ * object. Note this method is here rather than inside a default broker
+ * implementation to ensure that the broker reference inside it is the outer
+ * most interceptor
*/
protected static ConnectionContext createAdminConnectionContext(Broker broker) {
ConnectionContext context = new ConnectionContext();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java Wed Aug 8 11:56:59 2007
@@ -33,36 +33,34 @@
public void stop() throws Exception {
connection.stop();
}
-
+
/**
* @return true if the Connection is slow
*/
public boolean isSlow() {
return connection.isSlow();
}
-
+
/**
* @return if after being marked, the Connection is still writing
*/
public boolean isBlocked() {
return connection.isBlocked();
}
-
-
+
/**
* @return true if the Connection is connected
*/
public boolean isConnected() {
return connection.isConnected();
}
-
+
/**
* @return true if the Connection is active
*/
public boolean isActive() {
return connection.isActive();
}
-
/**
* Returns the number of messages to be dispatched to this connection
@@ -70,7 +68,7 @@
public int getDispatchQueueSize() {
return connection.getDispatchQueueSize();
}
-
+
/**
* Resets the statistics
*/
@@ -85,7 +83,7 @@
*/
public long getEnqueueCount() {
return connection.getStatistics().getEnqueues().getCount();
-
+
}
/**
@@ -97,12 +95,12 @@
return connection.getStatistics().getDequeues().getCount();
}
- public String getRemoteAddress() {
- return connection.getRemoteAddress();
- }
-
- public String getConnectionId() {
- return connection.getConnectionId();
- }
+ public String getRemoteAddress() {
+ return connection.getRemoteAddress();
+ }
+
+ public String getConnectionId() {
+ return connection.getConnectionId();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTransportConnection.java Wed Aug 8 11:56:59 2007
@@ -47,10 +47,11 @@
private ConnectionViewMBean mbean;
private ObjectName byClientIdName;
- private ObjectName byAddressName;
+ private ObjectName byAddressName;
- public ManagedTransportConnection(TransportConnector connector, Transport transport, Broker broker, TaskRunnerFactory factory, MBeanServer server,
- ObjectName connectorName) throws IOException {
+ public ManagedTransportConnection(TransportConnector connector, Transport transport, Broker broker,
+ TaskRunnerFactory factory, MBeanServer server, ObjectName connectorName)
+ throws IOException {
super(connector, transport, broker, factory);
this.server = server;
this.connectorName = connectorName;
@@ -64,12 +65,12 @@
setPendingStop(true);
return;
}
- synchronized(this) {
- unregisterMBean(byClientIdName);
- unregisterMBean(byAddressName);
- byClientIdName=null;
- byAddressName=null;
- }
+ synchronized (this) {
+ unregisterMBean(byClientIdName);
+ unregisterMBean(byAddressName);
+ byClientIdName = null;
+ byAddressName = null;
+ }
super.doStop();
}
@@ -85,9 +86,9 @@
Response answer = super.processAddConnection(info);
String clientId = info.getClientId();
if (clientId != null) {
- if(byClientIdName==null) {
- byClientIdName = createByClientIdObjectName(clientId);
- registerMBean(byClientIdName);
+ if (byClientIdName == null) {
+ byClientIdName = createByClientIdObjectName(clientId);
+ registerMBean(byClientIdName);
}
}
return answer;
@@ -96,24 +97,23 @@
// Implementation methods
// -------------------------------------------------------------------------
protected void registerMBean(ObjectName name) {
- if( name!=null ) {
- try {
- server.registerMBean(mbean, name);
- } catch (Throwable e) {
- log.warn("Failed to register MBean: "+name);
- log.debug("Failure reason: "+e,e);
- }
- }
+ if (name != null) {
+ try {
+ server.registerMBean(mbean, name);
+ } catch (Throwable e) {
+ log.warn("Failed to register MBean: " + name);
+ log.debug("Failure reason: " + e, e);
+ }
+ }
}
protected void unregisterMBean(ObjectName name) {
if (name != null) {
try {
server.unregisterMBean(name);
- }
- catch (Throwable e) {
+ } catch (Throwable e) {
log.warn("Failed to unregister mbean: " + name);
- log.debug("Failure reason: "+e,e);
+ log.debug("Failure reason: " + e, e);
}
}
}
@@ -122,36 +122,29 @@
// Build the object name for the destination
Hashtable map = connectorName.getKeyPropertyList();
try {
- return new ObjectName(
- connectorName.getDomain()+":"+
- "BrokerName="+JMXSupport.encodeObjectNamePart((String) map.get("BrokerName"))+","+
- "Type=Connection,"+
- "ConnectorName="+JMXSupport.encodeObjectNamePart((String) map.get("ConnectorName"))+","+
- "ViewType="+JMXSupport.encodeObjectNamePart(type)+","+
- "Name="+JMXSupport.encodeObjectNamePart(value)
- );
- }
- catch (Throwable e) {
+ return new ObjectName(connectorName.getDomain() + ":" + "BrokerName="
+ + JMXSupport.encodeObjectNamePart((String)map.get("BrokerName")) + ","
+ + "Type=Connection," + "ConnectorName="
+ + JMXSupport.encodeObjectNamePart((String)map.get("ConnectorName")) + ","
+ + "ViewType=" + JMXSupport.encodeObjectNamePart(type) + "," + "Name="
+ + JMXSupport.encodeObjectNamePart(value));
+ } catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
}
-
+
protected ObjectName createByClientIdObjectName(String value) throws IOException {
// Build the object name for the destination
Hashtable map = connectorName.getKeyPropertyList();
try {
- return new ObjectName(
- connectorName.getDomain()+":"+
- "BrokerName="+JMXSupport.encodeObjectNamePart((String) map.get("BrokerName"))+","+
- "Type=Connection,"+
- "ConnectorName="+JMXSupport.encodeObjectNamePart((String) map.get("ConnectorName"))+","+
- "Connection="+JMXSupport.encodeObjectNamePart(value)
- );
- }
- catch (Throwable e) {
+ return new ObjectName(connectorName.getDomain() + ":" + "BrokerName="
+ + JMXSupport.encodeObjectNamePart((String)map.get("BrokerName")) + ","
+ + "Type=Connection," + "ConnectorName="
+ + JMXSupport.encodeObjectNamePart((String)map.get("ConnectorName")) + ","
+ + "Connection=" + JMXSupport.encodeObjectNamePart(value));
+ } catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java Wed Aug 8 11:56:59 2007
@@ -22,40 +22,40 @@
private final NetworkBridge bridge;
- public NetworkBridgeView(NetworkBridge bridge) {
- this.bridge = bridge;
+ public NetworkBridgeView(NetworkBridge bridge) {
+ this.bridge = bridge;
}
-
+
public void start() throws Exception {
- bridge.start();
+ bridge.start();
}
public void stop() throws Exception {
- bridge.stop();
+ bridge.stop();
}
-
+
public String getLocalAddress() {
- return bridge.getLocalAddress();
+ return bridge.getLocalAddress();
}
public String getRemoteAddress() {
- return bridge.getRemoteAddress();
+ return bridge.getRemoteAddress();
}
public String getRemoteBrokerName() {
- return bridge.getRemoteBrokerName();
+ return bridge.getRemoteBrokerName();
}
public String getLocalBrokerName() {
- return bridge.getLocalBrokerName();
+ return bridge.getLocalBrokerName();
}
public long getEnqueueCounter() {
- return bridge.getEnqueueCounter();
+ return bridge.getEnqueueCounter();
}
public long getDequeueCounter() {
- return bridge.getDequeueCounter();
+ return bridge.getDequeueCounter();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java Wed Aug 8 11:56:59 2007
@@ -25,92 +25,92 @@
public NetworkConnectorView(NetworkConnector connector) {
this.connector = connector;
}
-
+
public void start() throws Exception {
connector.start();
}
public void stop() throws Exception {
- connector.stop();
+ connector.stop();
+ }
+
+ public String getName() {
+ return connector.getName();
+ }
+
+ public int getNetworkTTL() {
+ return connector.getNetworkTTL();
+ }
+
+ public int getPrefetchSize() {
+ return connector.getPrefetchSize();
+ }
+
+ public String getUserName() {
+ return connector.getUserName();
+ }
+
+ public boolean isBridgeTempDestinations() {
+ return connector.isBridgeTempDestinations();
+ }
+
+ public boolean isConduitSubscriptions() {
+ return connector.isConduitSubscriptions();
+ }
+
+ public boolean isDecreaseNetworkConsumerPriority() {
+ return connector.isDecreaseNetworkConsumerPriority();
+ }
+
+ public boolean isDispatchAsync() {
+ return connector.isDispatchAsync();
+ }
+
+ public boolean isDynamicOnly() {
+ return connector.isDynamicOnly();
}
- public String getName() {
- return connector.getName();
- }
-
- public int getNetworkTTL() {
- return connector.getNetworkTTL();
- }
-
- public int getPrefetchSize() {
- return connector.getPrefetchSize();
- }
-
- public String getUserName() {
- return connector.getUserName();
- }
-
- public boolean isBridgeTempDestinations() {
- return connector.isBridgeTempDestinations();
- }
-
- public boolean isConduitSubscriptions() {
- return connector.isConduitSubscriptions();
- }
-
- public boolean isDecreaseNetworkConsumerPriority() {
- return connector.isDecreaseNetworkConsumerPriority();
- }
-
- public boolean isDispatchAsync() {
- return connector.isDispatchAsync();
- }
-
- public boolean isDynamicOnly() {
- return connector.isDynamicOnly();
- }
-
- public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
- connector.setBridgeTempDestinations(bridgeTempDestinations);
- }
-
- public void setConduitSubscriptions(boolean conduitSubscriptions) {
- connector.setConduitSubscriptions(conduitSubscriptions);
- }
-
- public void setDispatchAsync(boolean dispatchAsync) {
- connector.setDispatchAsync(dispatchAsync);
- }
-
- public void setDynamicOnly(boolean dynamicOnly) {
- connector.setDynamicOnly(dynamicOnly);
- }
-
- public void setNetworkTTL(int networkTTL) {
- connector.setNetworkTTL(networkTTL);
- }
-
- public void setPassword(String password) {
- connector.setPassword(password);
- }
-
- public void setPrefetchSize(int prefetchSize) {
- connector.setPrefetchSize(prefetchSize);
- }
-
- public void setUserName(String userName) {
- connector.setUserName(userName);
- }
-
- public String getPassword() {
- String pw = connector.getPassword();
- // Hide the password for security reasons.
- if( pw!= null )
- pw = pw.replaceAll(".", "*");
- return pw;
- }
-
- public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority) {
- connector.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority);
- }
+ public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
+ connector.setBridgeTempDestinations(bridgeTempDestinations);
+ }
+
+ public void setConduitSubscriptions(boolean conduitSubscriptions) {
+ connector.setConduitSubscriptions(conduitSubscriptions);
+ }
+
+ public void setDispatchAsync(boolean dispatchAsync) {
+ connector.setDispatchAsync(dispatchAsync);
+ }
+
+ public void setDynamicOnly(boolean dynamicOnly) {
+ connector.setDynamicOnly(dynamicOnly);
+ }
+
+ public void setNetworkTTL(int networkTTL) {
+ connector.setNetworkTTL(networkTTL);
+ }
+
+ public void setPassword(String password) {
+ connector.setPassword(password);
+ }
+
+ public void setPrefetchSize(int prefetchSize) {
+ connector.setPrefetchSize(prefetchSize);
+ }
+
+ public void setUserName(String userName) {
+ connector.setUserName(userName);
+ }
+
+ public String getPassword() {
+ String pw = connector.getPassword();
+ // Hide the password for security reasons.
+ if (pw != null)
+ pw = pw.replaceAll(".", "*");
+ return pw;
+ }
+
+ public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority) {
+ connector.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority);
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorViewMBean.java Wed Aug 8 11:56:59 2007
@@ -20,24 +20,42 @@
public interface NetworkConnectorViewMBean extends Service {
- public String getName();
- public int getNetworkTTL();
- public int getPrefetchSize();
- public String getUserName();
- public boolean isBridgeTempDestinations();
- public boolean isConduitSubscriptions();
- public boolean isDecreaseNetworkConsumerPriority();
- public boolean isDispatchAsync();
- public boolean isDynamicOnly();
- public void setBridgeTempDestinations(boolean bridgeTempDestinations);
- public void setConduitSubscriptions(boolean conduitSubscriptions);
- public void setDispatchAsync(boolean dispatchAsync);
- public void setDynamicOnly(boolean dynamicOnly);
- public void setNetworkTTL(int networkTTL);
- public void setPassword(String password);
- public void setPrefetchSize(int prefetchSize);
- public void setUserName(String userName);
- public String getPassword();
- public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority);
+ public String getName();
+
+ public int getNetworkTTL();
+
+ public int getPrefetchSize();
+
+ public String getUserName();
+
+ public boolean isBridgeTempDestinations();
+
+ public boolean isConduitSubscriptions();
+
+ public boolean isDecreaseNetworkConsumerPriority();
+
+ public boolean isDispatchAsync();
+
+ public boolean isDynamicOnly();
+
+ public void setBridgeTempDestinations(boolean bridgeTempDestinations);
+
+ public void setConduitSubscriptions(boolean conduitSubscriptions);
+
+ public void setDispatchAsync(boolean dispatchAsync);
+
+ public void setDynamicOnly(boolean dynamicOnly);
+
+ public void setNetworkTTL(int networkTTL);
+
+ public void setPassword(String password);
+
+ public void setPrefetchSize(int prefetchSize);
+
+ public void setUserName(String userName);
+
+ public String getPassword();
+
+ public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java Wed Aug 8 11:56:59 2007
@@ -22,40 +22,37 @@
import javax.jms.InvalidSelectorException;
-
-
/**
* @version $Revision: 1.5 $
*/
public class SubscriptionView implements SubscriptionViewMBean {
-
-
+
protected final Subscription subscription;
protected final String clientId;
-
-
+
/**
* Constructor
+ *
* @param subs
*/
- public SubscriptionView(String clientId,Subscription subs){
+ public SubscriptionView(String clientId, Subscription subs) {
this.clientId = clientId;
this.subscription = subs;
}
-
+
/**
* @return the clientId
*/
- public String getClientId(){
+ public String getClientId() {
return clientId;
}
-
+
/**
* @return the id of the Connection the Subscription is on
*/
- public String getConnectionId(){
+ public String getConnectionId() {
ConsumerInfo info = getConsumerInfo();
- if (info != null){
+ if (info != null) {
return info.getConsumerId().getConnectionId();
}
return "NOTSET";
@@ -64,9 +61,9 @@
/**
* @return the id of the Session the subscription is on
*/
- public long getSessionId(){
+ public long getSessionId() {
ConsumerInfo info = getConsumerInfo();
- if (info != null){
+ if (info != null) {
return info.getConsumerId().getSessionId();
}
return 0;
@@ -75,9 +72,9 @@
/**
* @return the id of the Subscription
*/
- public long getSubcriptionId(){
+ public long getSubcriptionId() {
ConsumerInfo info = getConsumerInfo();
- if (info != null){
+ if (info != null) {
return info.getConsumerId().getValue();
}
return 0;
@@ -86,9 +83,9 @@
/**
* @return the destination name
*/
- public String getDestinationName(){
+ public String getDestinationName() {
ConsumerInfo info = getConsumerInfo();
- if (info != null){
+ if (info != null) {
ActiveMQDestination dest = info.getDestination();
return dest.getPhysicalName();
}
@@ -105,8 +102,7 @@
public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
if (subscription != null) {
subscription.setSelector(selector);
- }
- else {
+ } else {
throw new UnsupportedOperationException("No subscription object");
}
}
@@ -114,9 +110,9 @@
/**
* @return true if the destination is a Queue
*/
- public boolean isDestinationQueue(){
+ public boolean isDestinationQueue() {
ConsumerInfo info = getConsumerInfo();
- if (info != null){
+ if (info != null) {
ActiveMQDestination dest = info.getDestination();
return dest.isQueue();
}
@@ -126,9 +122,9 @@
/**
* @return true of the destination is a Topic
*/
- public boolean isDestinationTopic(){
+ public boolean isDestinationTopic() {
ConsumerInfo info = getConsumerInfo();
- if (info != null){
+ if (info != null) {
ActiveMQDestination dest = info.getDestination();
return dest.isTopic();
}
@@ -138,32 +134,32 @@
/**
* @return true if the destination is temporary
*/
- public boolean isDestinationTemporary(){
+ public boolean isDestinationTemporary() {
ConsumerInfo info = getConsumerInfo();
- if (info != null){
+ if (info != null) {
ActiveMQDestination dest = info.getDestination();
return dest.isTemporary();
}
return false;
}
-
+
/**
* @return true if the subscriber is active
*/
- public boolean isActive(){
+ public boolean isActive() {
return true;
}
/**
- * The subscription should release as may references as it can to help the garbage collector
- * reclaim memory.
+ * The subscription should release as may references as it can to help the
+ * garbage collector reclaim memory.
*/
- public void gc(){
- if (subscription != null){
- subscription.gc();
+ public void gc() {
+ if (subscription != null) {
+ subscription.gc();
}
}
-
+
/**
* @return whether or not the subscriber is retroactive or not
*/
@@ -171,7 +167,7 @@
ConsumerInfo info = getConsumerInfo();
return info != null ? info.isRetroactive() : false;
}
-
+
/**
* @return whether or not the subscriber is an exclusive consumer
*/
@@ -179,8 +175,7 @@
ConsumerInfo info = getConsumerInfo();
return info != null ? info.isExclusive() : false;
}
-
-
+
/**
* @return whether or not the subscriber is durable (persistent)
*/
@@ -188,7 +183,7 @@
ConsumerInfo info = getConsumerInfo();
return info != null ? info.isDurable() : false;
}
-
+
/**
* @return whether or not the subscriber ignores local messages
*/
@@ -196,17 +191,18 @@
ConsumerInfo info = getConsumerInfo();
return info != null ? info.isNoLocal() : false;
}
-
-
+
/**
- * @return the maximum number of pending messages allowed in addition to the prefetch size. If enabled
- * to a non-zero value then this will perform eviction of messages for slow consumers on non-durable topics.
+ * @return the maximum number of pending messages allowed in addition to the
+ * prefetch size. If enabled to a non-zero value then this will
+ * perform eviction of messages for slow consumers on non-durable
+ * topics.
*/
public int getMaximumPendingMessageLimit() {
ConsumerInfo info = getConsumerInfo();
return info != null ? info.getMaximumPendingMessageLimit() : 0;
}
-
+
/**
* @return the consumer priority
*/
@@ -214,29 +210,30 @@
ConsumerInfo info = getConsumerInfo();
return info != null ? info.getPriority() : 0;
}
-
+
/**
- * @return the name of the consumer which is only used for durable consumers.
+ * @return the name of the consumer which is only used for durable
+ * consumers.
*/
public String getSubcriptionName() {
ConsumerInfo info = getConsumerInfo();
return info != null ? info.getSubscriptionName() : null;
}
-
+
/**
* @return number of messages pending delivery
*/
- public int getPendingQueueSize(){
+ public int getPendingQueueSize() {
return subscription != null ? subscription.getPendingQueueSize() : 0;
}
-
+
/**
* @return number of messages dispatched
*/
- public int getDispatchedQueueSize(){
+ public int getDispatchedQueueSize() {
return subscription != null ? subscription.getDispatchedQueueSize() : 0;
}
-
+
/**
* @return number of messages that matched the subscription
*/
@@ -258,15 +255,15 @@
return subscription != null ? subscription.getDequeueCounter() : 0;
}
- protected ConsumerInfo getConsumerInfo(){
+ protected ConsumerInfo getConsumerInfo() {
return subscription != null ? subscription.getConsumerInfo() : null;
}
-
+
/**
- *@return pretty print
+ * @return pretty print
*/
- public String toString(){
- return "SubscriptionView: " + getClientId() + ":" + getConnectionId();
+ public String toString() {
+ return "SubscriptionView: " + getClientId() + ":" + getConnectionId();
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicSubscriptionView.java Wed Aug 8 11:56:59 2007
@@ -29,7 +29,7 @@
}
protected TopicSubscription getTopicSubscription() {
- return (TopicSubscription) subscription;
+ return (TopicSubscription)subscription;
}
/**
@@ -47,14 +47,14 @@
TopicSubscription topicSubscription = getTopicSubscription();
return topicSubscription != null ? topicSubscription.getMaximumPendingMessages() : 0;
}
-
+
/**
*
*/
public void setMaximumPendingQueueSize(int max) {
TopicSubscription topicSubscription = getTopicSubscription();
- if ( topicSubscription != null ) {
- topicSubscription.setMaximumPendingMessages(max);
+ if (topicSubscription != null) {
+ topicSubscription.setMaximumPendingMessages(max);
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Wed Aug 8 11:56:59 2007
@@ -27,32 +27,37 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.TopicMessageStore;
/**
- *
* @version $Revision: 1.12 $
*/
public interface Destination extends Service {
void addSubscription(ConnectionContext context, Subscription sub) throws Exception;
+
void removeSubscription(ConnectionContext context, Subscription sub) throws Exception;
-
+
void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception;
+
boolean lock(MessageReference node, LockOwner lockOwner);
+
void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException;
-
+
void gc();
-
+
ActiveMQDestination getActiveMQDestination();
+
UsageManager getUsageManager();
void dispose(ConnectionContext context) throws IOException;
-
+
DestinationStatistics getDestinationStatistics();
+
DeadLetterStrategy getDeadLetterStrategy();
-
+
public Message[] browse();
+
public String getName();
- public MessageStore getMessageStore();
+
+ public MessageStore getMessageStore();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Wed Aug 8 11:56:59 2007
@@ -16,6 +16,10 @@
*/
package org.apache.activemq.broker.region;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -26,10 +30,6 @@
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Set;
-
/**
*
* @version $Revision$
@@ -42,7 +42,8 @@
this.next = next;
}
- public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
+ public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
+ throws IOException {
next.acknowledge(context, sub, ack, node);
}
@@ -105,17 +106,18 @@
/**
* Sends a message to the given destination which may be a wildcard
*/
- protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
+ protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination)
+ throws Exception {
Broker broker = context.getConnectionContext().getBroker();
Set destinations = broker.getDestinations(destination);
for (Iterator iter = destinations.iterator(); iter.hasNext();) {
- Destination dest = (Destination) iter.next();
+ Destination dest = (Destination)iter.next();
dest.send(context, message);
}
}
- public MessageStore getMessageStore() {
- return next.getMessageStore();
- }
+ public MessageStore getMessageStore() {
+ return next.getMessageStore();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java Wed Aug 8 11:56:59 2007
@@ -24,103 +24,102 @@
import org.apache.activemq.command.MessageId;
/**
- * Only used by the {@link QueueMessageReference#NULL_MESSAGE}
+ * Only used by the {@link QueueMessageReference#NULL_MESSAGE}
*/
-final class NullMessageReference implements
- QueueMessageReference {
+final class NullMessageReference implements QueueMessageReference {
- private ActiveMQMessage message = new ActiveMQMessage();
- private volatile int references;
-
- public void drop() {
- throw new RuntimeException("not implemented");
- }
-
- public LockOwner getLockOwner() {
- throw new RuntimeException("not implemented");
- }
-
- public boolean isAcked() {
- return false;
- }
-
- public boolean isDropped() {
- throw new RuntimeException("not implemented");
- }
-
- public boolean lock(LockOwner subscription) {
- return true;
- }
-
- public void setAcked(boolean b) {
- throw new RuntimeException("not implemented");
- }
-
- public void unlock() {
- }
-
- public int decrementReferenceCount() {
- return --references;
- }
-
- public long getExpiration() {
- throw new RuntimeException("not implemented");
- }
-
- public String getGroupID() {
- return null;
- }
-
- public int getGroupSequence() {
- return 0;
- }
-
- public Message getMessage() throws IOException {
- return message;
- }
-
- public Message getMessageHardRef() {
- throw new RuntimeException("not implemented");
- }
-
- public MessageId getMessageId() {
- return message.getMessageId();
- }
-
- public int getRedeliveryCounter() {
- throw new RuntimeException("not implemented");
- }
-
- public int getReferenceCount() {
- return references;
- }
-
- public Destination getRegionDestination() {
- return null;
- }
-
- public int getSize() {
- throw new RuntimeException("not implemented");
- }
-
- public ConsumerId getTargetConsumerId() {
- throw new RuntimeException("not implemented");
- }
-
- public void incrementRedeliveryCounter() {
- throw new RuntimeException("not implemented");
- }
-
- public int incrementReferenceCount() {
- return ++references;
- }
-
- public boolean isExpired() {
- throw new RuntimeException("not implemented");
- }
-
- public boolean isPersistent() {
- throw new RuntimeException("not implemented");
- }
+ private ActiveMQMessage message = new ActiveMQMessage();
+ private volatile int references;
+
+ public void drop() {
+ throw new RuntimeException("not implemented");
+ }
+
+ public LockOwner getLockOwner() {
+ throw new RuntimeException("not implemented");
+ }
+
+ public boolean isAcked() {
+ return false;
+ }
+
+ public boolean isDropped() {
+ throw new RuntimeException("not implemented");
+ }
+
+ public boolean lock(LockOwner subscription) {
+ return true;
+ }
+
+ public void setAcked(boolean b) {
+ throw new RuntimeException("not implemented");
+ }
+
+ public void unlock() {
+ }
+
+ public int decrementReferenceCount() {
+ return --references;
+ }
+
+ public long getExpiration() {
+ throw new RuntimeException("not implemented");
+ }
+
+ public String getGroupID() {
+ return null;
+ }
+
+ public int getGroupSequence() {
+ return 0;
+ }
+
+ public Message getMessage() throws IOException {
+ return message;
+ }
+
+ public Message getMessageHardRef() {
+ throw new RuntimeException("not implemented");
+ }
+
+ public MessageId getMessageId() {
+ return message.getMessageId();
+ }
+
+ public int getRedeliveryCounter() {
+ throw new RuntimeException("not implemented");
+ }
+
+ public int getReferenceCount() {
+ return references;
+ }
+
+ public Destination getRegionDestination() {
+ return null;
+ }
+
+ public int getSize() {
+ throw new RuntimeException("not implemented");
+ }
+
+ public ConsumerId getTargetConsumerId() {
+ throw new RuntimeException("not implemented");
+ }
+
+ public void incrementRedeliveryCounter() {
+ throw new RuntimeException("not implemented");
+ }
+
+ public int incrementReferenceCount() {
+ return ++references;
+ }
+
+ public boolean isExpired() {
+ throw new RuntimeException("not implemented");
+ }
+
+ public boolean isPersistent() {
+ throw new RuntimeException("not implemented");
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Wed Aug 8 11:56:59 2007
@@ -27,32 +27,29 @@
import org.apache.activemq.filter.MessageEvaluationContext;
public class QueueBrowserSubscription extends QueueSubscription {
-
+
boolean browseDone;
-
- public QueueBrowserSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
- super(broker,context, info);
+
+ public QueueBrowserSubscription(Broker broker, ConnectionContext context, ConsumerInfo info)
+ throws InvalidSelectorException {
+ super(broker, context, info);
}
-
+
protected boolean canDispatch(MessageReference node) {
return !((QueueMessageReference)node).isAcked();
}
-
+
public synchronized String toString() {
- return
- "QueueBrowserSubscription:" +
- " consumer="+info.getConsumerId()+
- ", destinations="+destinations.size()+
- ", dispatched="+dispatched.size()+
- ", delivered="+this.prefetchExtension+
- ", pending="+getPendingQueueSize();
+ return "QueueBrowserSubscription:" + " consumer=" + info.getConsumerId() + ", destinations="
+ + destinations.size() + ", dispatched=" + dispatched.size() + ", delivered="
+ + this.prefetchExtension + ", pending=" + getPendingQueueSize();
}
public void browseDone() throws Exception {
browseDone = true;
add(QueueMessageReference.NULL_MESSAGE);
}
-
+
public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
return !browseDone && super.matches(node, context);
}
@@ -60,7 +57,8 @@
/**
* Since we are a browser we don't really remove the message from the queue.
*/
- protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
+ protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n)
+ throws IOException {
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java Wed Aug 8 11:56:59 2007
@@ -33,31 +33,30 @@
*/
public class QueueRegion extends AbstractRegion {
-
-
- public QueueRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
- DestinationFactory destinationFactory) {
- super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
+ public QueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics,
+ UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+ DestinationFactory destinationFactory) {
+ super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
public String toString() {
- return "QueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage()
- + "%";
+ return "QueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size()
+ + ", memory=" + memoryManager.getPercentUsage() + "%";
}
- protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
+ protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
+ throws InvalidSelectorException {
if (info.isBrowser()) {
- return new QueueBrowserSubscription(broker,context, info);
- }
- else {
- return new QueueSubscription(broker,context, info);
+ return new QueueBrowserSubscription(broker, context, info);
+ } else {
+ return new QueueSubscription(broker, context, info);
}
}
protected Set getInactiveDestinations() {
Set inactiveDestinations = super.getInactiveDestinations();
for (Iterator iter = inactiveDestinations.iterator(); iter.hasNext();) {
- ActiveMQDestination dest = (ActiveMQDestination) iter.next();
+ ActiveMQDestination dest = (ActiveMQDestination)iter.next();
if (!dest.isQueue())
iter.remove();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Wed Aug 8 11:56:59 2007
@@ -60,12 +60,12 @@
* @version $Revision: 1.21 $
*/
public class Topic implements Destination {
- private static final Log log = LogFactory.getLog(Topic.class);
+ private static final Log LOG = LogFactory.getLog(Topic.class);
protected final ActiveMQDestination destination;
protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
protected final Valve dispatchValve = new Valve(true);
- protected final TopicMessageStore store;// this could be NULL! (If an
- // advsiory)
+ // this could be NULL! (If an advisory)
+ protected final TopicMessageStore store;
protected final UsageManager usageManager;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
@@ -349,8 +349,8 @@
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if (message.isExpired()) {
- if (log.isDebugEnabled()) {
- log.debug("Expired message: " + message);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Expired message: " + message);
}
return;
}
@@ -468,7 +468,7 @@
}
}
} catch (Throwable e) {
- log.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
+ LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
}
return (Message[])result.toArray(new Message[result.size()]);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java Wed Aug 8 11:56:59 2007
@@ -19,25 +19,25 @@
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.kaha.Store;
-
/**
- * Creates a FilePendingMessageCursor
- * *
- * @org.apache.xbean.XBean element="fileQueueCursor" description="Pending messages paged in from file"
+ * Creates a FilePendingMessageCursor *
+ *
+ * @org.apache.xbean.XBean element="fileQueueCursor" description="Pending
+ * messages paged in from file"
*
* @version $Revision$
*/
-public class FilePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy{
+public class FilePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy {
/**
* @param queue
* @param tmpStore
* @return the cursor
- * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue, org.apache.activemq.kaha.Store)
+ * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue,
+ * org.apache.activemq.kaha.Store)
*/
- public PendingMessageCursor getQueuePendingMessageCursor(Queue queue,Store tmpStore){
- return new FilePendingMessageCursor("PendingCursor:" + queue.getName(),tmpStore);
+ public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore) {
+ return new FilePendingMessageCursor("PendingCursor:" + queue.getName(), tmpStore);
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java Wed Aug 8 11:56:59 2007
@@ -17,25 +17,26 @@
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.kaha.Store;
-
/**
- * Creates a PendIngMessageCursor for Durable subscribers
- * *
- * @org.apache.xbean.XBean element="fileCursor" description="Pending messages for durable subscribers
- * held in temporary files"
+ * Creates a PendIngMessageCursor for Durable subscribers *
+ *
+ * @org.apache.xbean.XBean element="fileCursor" description="Pending messages
+ * for durable subscribers held in temporary files"
*
* @version $Revision$
*/
-public class FilePendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy{
+public class FilePendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy {
/**
* @param name
* @param tmpStorage
* @param maxBatchSize
* @return a Cursor
- * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String, org.apache.activemq.kaha.Store, int)
+ * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String,
+ * org.apache.activemq.kaha.Store, int)
*/
- public PendingMessageCursor getSubscriberPendingMessageCursor(String name,Store tmpStorage,int maxBatchSize){
- return new FilePendingMessageCursor("PendingCursor:" + name,tmpStorage);
+ public PendingMessageCursor getSubscriberPendingMessageCursor(String name, Store tmpStorage,
+ int maxBatchSize) {
+ return new FilePendingMessageCursor("PendingCursor:" + name, tmpStorage);
}
-}
\ No newline at end of file
+}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java Wed Aug 8 11:56:59 2007
@@ -19,25 +19,25 @@
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
import org.apache.activemq.kaha.Store;
-
/**
- * Creates a StoreQueueCursor
- * *
- * @org.apache.xbean.XBean element="storeCursor" description="Pending messages paged in from the Store"
+ * Creates a StoreQueueCursor *
+ *
+ * @org.apache.xbean.XBean element="storeCursor" description="Pending messages
+ * paged in from the Store"
*
* @version $Revision$
*/
-public class StorePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy{
+public class StorePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy {
/**
* @param queue
* @param tmpStore
* @return the cursor
- * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue, org.apache.activemq.kaha.Store)
+ * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue,
+ * org.apache.activemq.kaha.Store)
*/
- public PendingMessageCursor getQueuePendingMessageCursor(Queue queue,Store tmpStore){
- return new StoreQueueCursor(queue,tmpStore);
+ public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore) {
+ return new StoreQueueCursor(queue, tmpStore);
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java Wed Aug 8 11:56:59 2007
@@ -20,20 +20,21 @@
import org.apache.activemq.kaha.Store;
/**
- * Creates a VMPendingMessageCursor
- * *
- * @org.apache.xbean.XBean element="vmQueueCursor" description="Pending messages held in the JVM"
+ * Creates a VMPendingMessageCursor *
+ *
+ * @org.apache.xbean.XBean element="vmQueueCursor" description="Pending messages
+ * held in the JVM"
*
* @version $Revision$
*/
-public class VMPendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy{
+public class VMPendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy {
/**
* @param queue
* @param tmpStore
- * @return the cursor
+ * @return the cursor
*/
- public PendingMessageCursor getQueuePendingMessageCursor(Queue queue,Store tmpStore){
+ public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore) {
return new VMPendingMessageCursor();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java Wed Aug 8 11:56:59 2007
@@ -17,24 +17,26 @@
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.kaha.Store;
-
/**
- * Creates a VMPendingMessageCursor
- * *
- * @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in the JVM"
+ * Creates a VMPendingMessageCursor *
+ *
+ * @org.apache.xbean.XBean element="vmCursor" description="Pending messages held
+ * in the JVM"
*
* @version $Revision$
*/
-public class VMPendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy{
+public class VMPendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy {
/**
* @param name
* @param tmpStorage
* @param maxBatchSize
* @return a Cursor
- * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String, org.apache.activemq.kaha.Store, int)
+ * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String,
+ * org.apache.activemq.kaha.Store, int)
*/
- public PendingMessageCursor getSubscriberPendingMessageCursor(String name,Store tmpStorage,int maxBatchSize){
+ public PendingMessageCursor getSubscriberPendingMessageCursor(String name, Store tmpStorage,
+ int maxBatchSize) {
return new VMPendingMessageCursor();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/MulticastTraceBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/MulticastTraceBrokerPlugin.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/MulticastTraceBrokerPlugin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/MulticastTraceBrokerPlugin.java Wed Aug 8 11:56:59 2007
@@ -23,7 +23,8 @@
import java.net.URISyntaxException;
/**
- * A Broker interceptor which allows you to trace all operations to a Multicast socket.
+ * A Broker interceptor which allows you to trace all operations to a Multicast
+ * socket.
*
* @org.apache.xbean.XBean
*
@@ -31,30 +32,30 @@
*/
public class MulticastTraceBrokerPlugin extends UDPTraceBrokerPlugin {
- private int timeToLive = 1;
-
- public MulticastTraceBrokerPlugin() {
- try {
- destination = new URI("multicast://224.1.2.3:61616");
- } catch (URISyntaxException wontHappen) {
- }
- }
-
- protected DatagramSocket createSocket() throws IOException {
+ private int timeToLive = 1;
+
+ public MulticastTraceBrokerPlugin() {
+ try {
+ destination = new URI("multicast://224.1.2.3:61616");
+ } catch (URISyntaxException wontHappen) {
+ }
+ }
+
+ protected DatagramSocket createSocket() throws IOException {
MulticastSocket s = new MulticastSocket();
- s.setSendBufferSize(maxTraceDatagramSize);
- s.setBroadcast(broadcast);
+ s.setSendBufferSize(maxTraceDatagramSize);
+ s.setBroadcast(broadcast);
s.setLoopbackMode(true);
s.setTimeToLive(timeToLive);
return s;
- }
+ }
- public int getTimeToLive() {
- return timeToLive;
- }
+ public int getTimeToLive() {
+ return timeToLive;
+ }
- public void setTimeToLive(int timeToLive) {
- this.timeToLive = timeToLive;
- }
+ public void setTimeToLive(int timeToLive) {
+ this.timeToLive = timeToLive;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java Wed Aug 8 11:56:59 2007
@@ -20,27 +20,27 @@
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;
-
/**
* A Broker interceptor which updates a JMS Client's timestamp on the message
- * with a broker timestamp. Useful when the clocks on client machines are known to
- * not be correct and you can only trust the time set on the broker machines.
+ * with a broker timestamp. Useful when the clocks on client machines are known
+ * to not be correct and you can only trust the time set on the broker machines.
*
- * Enabling this plugin will break JMS compliance since the timestamp that the producer
- * sees on the messages after as send() will be different from the timestamp the consumer
- * will observe when he receives the message. This plugin is not enabled in the default
- * ActiveMQ configuration.
+ * Enabling this plugin will break JMS compliance since the timestamp that the
+ * producer sees on the messages after as send() will be different from the
+ * timestamp the consumer will observe when he receives the message. This plugin
+ * is not enabled in the default ActiveMQ configuration.
*
- * @org.apache.xbean.XBean element="timeStampingBrokerPlugin"
+ * @org.apache.xbean.XBean element="timeStampingBrokerPlugin"
*
* @version $Revision$
*/
-public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
- public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
- if (message.getTimestamp() > 0 && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) {
- //timestamp not been disabled and has not passed through a network
+public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
+ public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
+ if (message.getTimestamp() > 0
+ && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) {
+ // timestamp not been disabled and has not passed through a network
message.setTimestamp(System.currentTimeMillis());
}
- super.send(producerExchange, message);
- }
+ super.send(producerExchange, message);
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/DestinationDotFileInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/DestinationDotFileInterceptor.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/DestinationDotFileInterceptor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/view/DestinationDotFileInterceptor.java Wed Aug 8 11:56:59 2007
@@ -16,6 +16,10 @@
*/
package org.apache.activemq.broker.view;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.Iterator;
+
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
@@ -23,12 +27,7 @@
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.filter.DestinationMapNode;
-import java.io.PrintWriter;
-import java.util.Collection;
-import java.util.Iterator;
-
/**
- *
* @version $Revision: $
*/
public class DestinationDotFileInterceptor extends DotFileInterceptorSupport {
@@ -45,13 +44,11 @@
return answer;
}
- public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
- throws Exception {
+ public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
super.removeDestination(context, destination, timeout);
generateFile();
}
-
protected void generateFile(PrintWriter writer) throws Exception {
ActiveMQDestination[] destinations = getDestinations();
@@ -71,7 +68,7 @@
writer.println("topic_root [fillcolor = deepskyblue, label = \"Topics\" ];");
writer.println("queue_root [fillcolor = deepskyblue, label = \"Queues\" ];");
writer.println();
-
+
writer.println("subgraph queues {");
writer.println(" node [fillcolor=red]; ");
writer.println(" label = \"Queues\"");
@@ -83,20 +80,20 @@
writer.println("subgraph topics {");
writer.println(" node [fillcolor=green]; ");
writer.println(" label = \"Topics\"");
- writer.println();
+ writer.println();
printNodeLinks(writer, map.getTopicRootNode(), "topic");
writer.println("}");
writer.println();
-
+
printNodes(writer, map.getQueueRootNode(), "queue");
writer.println();
-
+
printNodes(writer, map.getTopicRootNode(), "topic");
writer.println();
-
+
writer.println("}");
}
-
+
protected void printNodes(PrintWriter writer, DestinationMapNode node, String prefix) {
String path = getPath(node);
writer.print(" ");
@@ -106,8 +103,7 @@
String label = path;
if (prefix.equals("topic")) {
label = "Topics";
- }
- else if (prefix.equals("queue")) {
+ } else if (prefix.equals("queue")) {
label = "Queues";
}
writer.print("[ label = \"");
@@ -116,7 +112,7 @@
Collection children = node.getChildren();
for (Iterator iter = children.iterator(); iter.hasNext();) {
- DestinationMapNode child = (DestinationMapNode) iter.next();
+ DestinationMapNode child = (DestinationMapNode)iter.next();
printNodes(writer, child, prefix + ID_SEPARATOR + path);
}
}
@@ -125,7 +121,7 @@
String path = getPath(node);
Collection children = node.getChildren();
for (Iterator iter = children.iterator(); iter.hasNext();) {
- DestinationMapNode child = (DestinationMapNode) iter.next();
+ DestinationMapNode child = (DestinationMapNode)iter.next();
writer.print(" ");
writer.print(prefix);
@@ -142,7 +138,6 @@
printNodeLinks(writer, child, prefix + ID_SEPARATOR + path);
}
}
-
protected String getPath(DestinationMapNode node) {
String path = node.getPath();
|