Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 97326 invoked from network); 10 Oct 2007 19:53:57 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 10 Oct 2007 19:53:57 -0000 Received: (qmail 85552 invoked by uid 500); 10 Oct 2007 19:53:18 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 85533 invoked by uid 500); 10 Oct 2007 19:53:18 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 85519 invoked by uid 99); 10 Oct 2007 19:53:18 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Oct 2007 12:53:18 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Oct 2007 19:53:29 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E69D81A9832; Wed, 10 Oct 2007 12:53:08 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r583595 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/network/ test/java/org/apache/activemq/network/ Date: Wed, 10 Oct 2007 19:53:08 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071010195308.E69D81A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Wed Oct 10 12:53:07 2007 New Revision: 583595 URL: http://svn.apache.org/viewvc?rev=583595&view=rev Log: Fix for NPE in duplex network connection Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MapTransportConnectionStateRegister.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionState.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionStateRegister.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MapTransportConnectionStateRegister.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MapTransportConnectionStateRegister.java?rev=583595&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MapTransportConnectionStateRegister.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MapTransportConnectionStateRegister.java Wed Oct 10 12:53:07 2007 @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.SessionId; + +/** + * @version $Revision: 1.8 $ + */ + +public class MapTransportConnectionStateRegister implements TransportConnectionStateRegister{ + + private Map connectionStates = new ConcurrentHashMap(); + + public TransportConnectionState registerConnectionState(ConnectionId connectionId, + TransportConnectionState state) { + TransportConnectionState rc = connectionStates.put(connectionId, state); + return rc; + } + + public TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { + TransportConnectionState rc = connectionStates.remove(connectionId); + return rc; + } + + public List listConnectionStates() { + + List rc = new ArrayList(); + rc.addAll(connectionStates.values()); + return rc; + } + + public TransportConnectionState lookupConnectionState(String connectionId) { + return connectionStates.get(new ConnectionId(connectionId)); + } + + public TransportConnectionState lookupConnectionState(ConsumerId id) { + TransportConnectionState cs = lookupConnectionState(id.getConnectionId()); + if (cs == null) { + throw new IllegalStateException( + "Cannot lookup a consumer from a connection that had not been registered: " + + id.getParentId().getParentId()); + } + return cs; + } + + public TransportConnectionState lookupConnectionState(ProducerId id) { + TransportConnectionState cs = lookupConnectionState(id.getConnectionId()); + if (cs == null) { + throw new IllegalStateException( + "Cannot lookup a producer from a connection that had not been registered: " + + id.getParentId().getParentId()); + } + return cs; + } + + public TransportConnectionState lookupConnectionState(SessionId id) { + TransportConnectionState cs = lookupConnectionState(id.getConnectionId()); + if (cs == null) { + throw new IllegalStateException( + "Cannot lookup a session from a connection that had not been registered: " + + id.getParentId()); + } + return cs; + } + + public TransportConnectionState lookupConnectionState(ConnectionId connectionId) { + TransportConnectionState cs = connectionStates.get(connectionId); + if (cs == null) { + throw new IllegalStateException("Cannot lookup a connection that had not been registered: " + + connectionId); + } + return cs; + } + + + + public boolean doesHandleMultipleConnectionStates() { + return true; + } + + public boolean isEmpty() { + return connectionStates.isEmpty(); + } + + public void clear() { + connectionStates.clear(); + + } + + public void intialize(TransportConnectionStateRegister other) { + connectionStates.clear(); + connectionStates.putAll(other.mapStates()); + + } + + public Map mapStates() { + HashMap map = new HashMap(connectionStates); + return map; + } + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MapTransportConnectionStateRegister.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MapTransportConnectionStateRegister.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java?rev=583595&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java Wed Oct 10 12:53:07 2007 @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.SessionId; + +/** + * @version $Revision: 1.8 $ + */ + +public class SingleTransportConnectionStateRegister implements TransportConnectionStateRegister{ + + private TransportConnectionState connectionState; + private ConnectionId connectionId; + + public TransportConnectionState registerConnectionState(ConnectionId connectionId, + TransportConnectionState state) { + TransportConnectionState rc = connectionState; + connectionState = state; + this.connectionId = connectionId; + return rc; + } + + public synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { + TransportConnectionState rc = null; + + + if (connectionId != null && connectionState != null && this.connectionId!=null){ + if (this.connectionId.equals(connectionId)){ + rc = connectionState; + connectionState = null; + connectionId = null; + } + } + return rc; + } + + public synchronized List listConnectionStates() { + List rc = new ArrayList(); + if (connectionState != null) { + rc.add(connectionState); + } + return rc; + } + + public synchronized TransportConnectionState lookupConnectionState(String connectionId) { + TransportConnectionState cs = connectionState; + if (cs == null) { + throw new IllegalStateException( + "Cannot lookup a connectionId for a connection that had not been registered: " + + connectionId); + } + return cs; + } + + public synchronized TransportConnectionState lookupConnectionState(ConsumerId id) { + TransportConnectionState cs = connectionState; + if (cs == null) { + throw new IllegalStateException( + "Cannot lookup a consumer from a connection that had not been registered: " + + id.getParentId().getParentId()); + } + return cs; + } + + public synchronized TransportConnectionState lookupConnectionState(ProducerId id) { + TransportConnectionState cs = connectionState; + if (cs == null) { + throw new IllegalStateException( + "Cannot lookup a producer from a connection that had not been registered: " + + id.getParentId().getParentId()); + } + return cs; + } + + public synchronized TransportConnectionState lookupConnectionState(SessionId id) { + TransportConnectionState cs = connectionState; + if (cs == null) { + throw new IllegalStateException( + "Cannot lookup a session from a connection that had not been registered: " + + id.getParentId()); + } + return cs; + } + + public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { + TransportConnectionState cs = connectionState; + if (cs == null) { + throw new IllegalStateException("Cannot lookup a connection that had not been registered: " + + connectionId); + } + return cs; + } + + public synchronized boolean doesHandleMultipleConnectionStates() { + return false; + } + + public synchronized boolean isEmpty() { + return connectionState == null; + } + + public void intialize(TransportConnectionStateRegister other) { + + if (other.isEmpty()){ + clear(); + }else{ + Map map = other.mapStates(); + Iterator i = map.entrySet().iterator(); + Map.Entry entry = (Entry) i.next(); + connectionId = entry.getKey(); + connectionState =entry.getValue(); + } + + } + + public Map mapStates() { + Map map = new HashMap(); + if (!isEmpty()) { + map.put(connectionId, connectionState); + } + return map; + } + + public void clear() { + connectionState=null; + connectionId=null; + + } + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java ------------------------------------------------------------------------------ svn:executable = * Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=583595&r1=583594&r2=583595&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Oct 10 12:53:07 2007 @@ -150,45 +150,7 @@ private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); private DemandForwardingBridge duplexBridge; private final TaskRunnerFactory taskRunnerFactory; - private TransportConnectionState connectionState; - - static class TransportConnectionState extends org.apache.activemq.state.ConnectionState { - - private ConnectionContext context; - private TransportConnection connection; - private final Object connectMutex = new Object(); - private AtomicInteger referenceCounter = new AtomicInteger(); - - public TransportConnectionState(ConnectionInfo info, TransportConnection transportConnection) { - super(info); - connection = transportConnection; - } - - public ConnectionContext getContext() { - return context; - } - - public TransportConnection getConnection() { - return connection; - } - - public void setContext(ConnectionContext context) { - this.context = context; - } - - public void setConnection(TransportConnection connection) { - this.connection = connection; - } - - public int incrementReference() { - return referenceCounter.incrementAndGet(); - } - - public int decrementReference() { - return referenceCounter.decrementAndGet(); - } - - } + private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); /** * @param connector @@ -555,7 +517,7 @@ SessionState ss = cs.getSessionState(sessionId); if (ss == null) { throw new IllegalStateException( - "Cannot add a consumer to a session that had not been registered: " + broker.getBrokerName() + " Cannot add a consumer to a session that had not been registered: " + sessionId); } // Avoid replaying dup commands @@ -598,6 +560,7 @@ try { cs.addSession(info); } catch (IllegalStateException e) { + e.printStackTrace(); broker.removeSession(cs.getContext(), info); } } @@ -659,7 +622,7 @@ // If there are 2 concurrent connections for the same connection id, // then last one in wins, we need to sync here // to figure out the winner. - synchronized (state.connectMutex) { + synchronized (state.getConnectionMutex()) { if (state.getConnection() != this) { LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress()); state.getConnection().stop(); @@ -1306,90 +1269,44 @@ return null; } - // ///////////////////////////////////////////////////////////////// - // - // The following methods handle the logical connection state. It is possible - // multiple logical connections multiplexed over a single physical - // connection. - // But have not yet exploited the feature from the clients, so for - // performance - // reasons (to avoid a hash lookup) this class only keeps track of 1 - // logical connection state. - // - // A sub class could override these methods to a full multiple logical - // connection - // support. - // - // ///////////////////////////////////////////////////////////////// - - protected TransportConnectionState registerConnectionState(ConnectionId connectionId, - TransportConnectionState state) { - TransportConnectionState rc = connectionState; - connectionState = state; - return rc; - } - - protected TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { - TransportConnectionState rc = connectionState; - connectionState = null; - return rc; - } - - protected List listConnectionStates() { - List rc = new ArrayList(); - if (connectionState != null) { - rc.add(connectionState); + protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,TransportConnectionState state) { + TransportConnectionState cs = null; + if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()){ + //swap implementations + TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister(); + newRegister.intialize(connectionStateRegister); + connectionStateRegister = newRegister; } - return rc; + cs= connectionStateRegister.registerConnectionState(connectionId, state); + return cs; } - protected TransportConnectionState lookupConnectionState(String connectionId) { - TransportConnectionState cs = connectionState; - if (cs == null) { - throw new IllegalStateException( - "Cannot lookup a connectionId for a connection that had not been registered: " - + connectionId); - } - return cs; + protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { + return connectionStateRegister.unregisterConnectionState(connectionId); } - protected TransportConnectionState lookupConnectionState(ConsumerId id) { - TransportConnectionState cs = connectionState; - if (cs == null) { - throw new IllegalStateException( - "Cannot lookup a consumer from a connection that had not been registered: " - + id.getParentId().getParentId()); - } - return cs; + protected synchronized List listConnectionStates() { + return connectionStateRegister.listConnectionStates(); } - protected TransportConnectionState lookupConnectionState(ProducerId id) { - TransportConnectionState cs = connectionState; - if (cs == null) { - throw new IllegalStateException( - "Cannot lookup a producer from a connection that had not been registered: " - + id.getParentId().getParentId()); - } - return cs; + protected synchronized TransportConnectionState lookupConnectionState(String connectionId) { + return connectionStateRegister.lookupConnectionState(connectionId); } - protected TransportConnectionState lookupConnectionState(SessionId id) { - TransportConnectionState cs = connectionState; - if (cs == null) { - throw new IllegalStateException( - "Cannot lookup a session from a connection that had not been registered: " - + id.getParentId()); - } - return cs; + protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) { + return connectionStateRegister.lookupConnectionState(id); } - protected TransportConnectionState lookupConnectionState(ConnectionId connectionId) { - TransportConnectionState cs = connectionState; - if (cs == null) { - throw new IllegalStateException("Cannot lookup a connection that had not been registered: " - + connectionId); - } - return cs; + protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) { + return connectionStateRegister.lookupConnectionState(id); + } + + protected synchronized TransportConnectionState lookupConnectionState(SessionId id) { + return connectionStateRegister.lookupConnectionState(id); + } + + protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { + return connectionStateRegister.lookupConnectionState(connectionId); } } Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionState.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionState.java?rev=583595&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionState.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionState.java Wed Oct 10 12:53:07 2007 @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.command.ConnectionInfo; + +/** + * @version $Revision: 1.8 $ + */ + +public class TransportConnectionState extends org.apache.activemq.state.ConnectionState { + + private ConnectionContext context; + private TransportConnection connection; + private AtomicInteger referenceCounter = new AtomicInteger(); + private final Object connectionMutex = new Object(); + + public TransportConnectionState(ConnectionInfo info, TransportConnection transportConnection) { + super(info); + connection = transportConnection; + } + + public ConnectionContext getContext() { + return context; + } + + public TransportConnection getConnection() { + return connection; + } + + public void setContext(ConnectionContext context) { + this.context = context; + } + + public void setConnection(TransportConnection connection) { + this.connection = connection; + } + + public int incrementReference() { + return referenceCounter.incrementAndGet(); + } + + public int decrementReference() { + return referenceCounter.decrementAndGet(); + } + + public AtomicInteger getReferenceCounter() { + return referenceCounter; + } + + public void setReferenceCounter(AtomicInteger referenceCounter) { + this.referenceCounter = referenceCounter; + } + + public Object getConnectionMutex() { + return connectionMutex; + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionState.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionState.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionStateRegister.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionStateRegister.java?rev=583595&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionStateRegister.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionStateRegister.java Wed Oct 10 12:53:07 2007 @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.List; +import java.util.Map; + +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.SessionId; + +/** + * @version $Revision: 1.8 $ + */ + +public interface TransportConnectionStateRegister{ + + TransportConnectionState registerConnectionState(ConnectionId connectionId, + TransportConnectionState state); + + TransportConnectionState unregisterConnectionState(ConnectionId connectionId); + + List listConnectionStates(); + + MapmapStates(); + + TransportConnectionState lookupConnectionState(String connectionId); + + TransportConnectionState lookupConnectionState(ConsumerId id); + + TransportConnectionState lookupConnectionState(ProducerId id); + + TransportConnectionState lookupConnectionState(SessionId id); + + TransportConnectionState lookupConnectionState(ConnectionId connectionId); + + boolean isEmpty(); + + boolean doesHandleMultipleConnectionStates(); + + void intialize(TransportConnectionStateRegister other); + + void clear(); + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionStateRegister.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionStateRegister.java ------------------------------------------------------------------------------ svn:executable = * Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=583595&r1=583594&r2=583595&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Wed Oct 10 12:53:07 2007 @@ -100,6 +100,7 @@ protected final ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap(); protected final BrokerId localBrokerPath[] = new BrokerId[] {null}; protected CountDownLatch startedLatch = new CountDownLatch(2); + protected CountDownLatch localStartedLatch = new CountDownLatch(1); protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1); protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false); protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); @@ -172,6 +173,7 @@ localBridgeStarted.set(false); remoteBridgeStarted.set(false); startedLatch = new CountDownLatch(2); + localStartedLatch = new CountDownLatch(1); } } @@ -261,6 +263,7 @@ LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established."); startedLatch.countDown(); + localStartedLatch.countDown(); setupStaticDestinations(); } } @@ -339,6 +342,7 @@ // stuck waiting for it to start up. startedLatch.countDown(); startedLatch.countDown(); + localStartedLatch.countDown(); ss.throwFirstException(); } } @@ -406,6 +410,7 @@ localBroker.oneway(command); break; case ConsumerInfo.DATA_STRUCTURE_TYPE: + localStartedLatch.await(); if (!addConsumerInfo((ConsumerInfo)command)) { if (LOG.isDebugEnabled()) { LOG.debug("Ignoring ConsumerInfo: " + command); @@ -430,6 +435,7 @@ } } } catch (Throwable e) { + e.printStackTrace(); serviceRemoteException(e); } } @@ -554,7 +560,7 @@ try { if (command.isMessageDispatch()) { enqueueCounter.incrementAndGet(); - waitStarted(); + //localStartedLatch.await(); final MessageDispatch md = (MessageDispatch)command; DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); if (sub != null) { @@ -628,7 +634,8 @@ LOG.warn("Unexpected local command: " + command); } } - } catch (Exception e) { + } catch (Throwable e) { + e.printStackTrace(); serviceLocalException(e); } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=583595&r1=583594&r2=583595&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java Wed Oct 10 12:53:07 2007 @@ -90,18 +90,19 @@ } } - public void xtestFiltering() throws Exception { + public void testFiltering() throws Exception { MessageConsumer includedConsumer = remoteSession.createConsumer(included); MessageConsumer excludedConsumer = remoteSession.createConsumer(excluded); MessageProducer includedProducer = localSession.createProducer(included); MessageProducer excludedProducer = localSession.createProducer(excluded); - Thread.sleep(1000); + // allow for consumer infos to perculate arround + Thread.sleep(2000); Message test = localSession.createTextMessage("test"); includedProducer.send(test); excludedProducer.send(test); - assertNull(excludedConsumer.receive(500)); - assertNotNull(includedConsumer.receive(500)); + assertNull(excludedConsumer.receive(1000)); + assertNotNull(includedConsumer.receive(1000)); } public void xtestConduitBridge() throws Exception {