Stirling Chow created AMQ-4159: ---------------------------------- Summary: Lack of thread-safety in SimpleDiscoveryAgent can cause multiple concurrent attempts to establish bridge, which can result in permanent bridge failure Key: AMQ-4159 URL: https://issues.apache.org/jira/browse/AMQ-4159 Project: ActiveMQ Issue Type: Bug Affects Versions: 5.8.0 Reporter: Stirling Chow Symptom ======= I was diagnosing a deadlock issue in {{DiscoveryNetworkConnector}} and noticed that during one of the tests, concurrent calls were being made to {{DiscoveryNetworkConnector.onServiceAdd}} for the same {{DiscoveryEvent}}. This was unexpected because only a single service (URL) had been given to {{SimpleDiscoveryAgent}}. In fact, during one of the tests I observed dozens of concurrent calls. Concurrent attempts to establish a bridge to the *same* remote broker are problematic because they expose a number of race conditions in {{DiscoveryNetworkConnector}} and {{RegionBroker}} that can lead to permanent bridge failure, as well as unnecessary thread pool execution/resource usage and logging. The race conditions will be filed as separate issues. This issue specifically addresses the bug that causes {{SimpleDiscoveryAgent}} to uncontrollably multiply bridge connection attempts. Cause ===== When {{DemandForwardingBridgeSupport}} handles exceptions from either the local or remote sides of the the bridge, it fires a "bridge failed" event: {code:title=DemandForwardingBridgeSupport.java} public void serviceLocalException(Throwable error) { if (!disposed.get()) { LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error); LOG.debug("The local Exception was:" + error, error); brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { ServiceSupport.dispose(getControllingService()); } }); fireBridgeFailed(); } } public void serviceRemoteException(Throwable error) { if (!disposed.get()) { if (error instanceof SecurityException || error instanceof GeneralSecurityException) { LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); } else { LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); } LOG.debug("The remote Exception was: " + error, error); brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { ServiceSupport.dispose(getControllingService()); } }); fireBridgeFailed(); } } private void fireBridgeFailed() { NetworkBridgeListener l = this.networkBridgeListener; if (l != null) { l.bridgeFailed(); } } {code} {{DiscoveryNetworkConnector}} is the {{NetworkBridgeListener}}, and it's {{bridgeFailed}} method calls back to {{SimpleDiscoveryAgent.serviceFailed(...)}}: {code:title=DiscoveryNetworkConnectol.java} protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) { class DiscoverNetworkBridgeListener extends MBeanNetworkListener { public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName connectorName) { super(brokerService, connectorName); } public void bridgeFailed() { if (!serviceSupport.isStopped()) { try { discoveryAgent.serviceFailed(event); } catch (IOException e) { } } } } ... {code} In response, {{SimpleDiscoveryAgent.serviceFailed(...)}} pauses for the {{reconnectDelay}} before attempting to re-establish the bridge via {{DiscoveryNetworkConnector.onServiceAdd(...)}}: {code:title=SimpleDiscoveryAgent.java} public void serviceFailed(DiscoveryEvent devent) throws IOException { final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent; if (sevent.failed.compareAndSet(false, true)) { listener.onServiceRemove(sevent); taskRunner.execute(new Runnable() { public void run() { // We detect a failed connection attempt because the service // fails right // away. if (event.connectTime + minConnectTime > System.currentTimeMillis()) { LOG.debug("Failure occurred soon after the discovery event was generated. It will be classified as a connection failure: "+event); ... synchronized (sleepMutex) { try { if (!running.get()) { LOG.debug("Reconnecting disabled: stopped"); return; } LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect."); sleepMutex.wait(event.reconnectDelay); } catch (InterruptedException ie) { LOG.debug("Reconnecting disabled: " + ie); Thread.currentThread().interrupt(); return; } } ... event.connectTime = System.currentTimeMillis(); event.failed.set(false); listener.onServiceAdd(event); } }, "Simple Discovery Agent"); } } {code} -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira