[ https://issues.apache.org/jira/browse/AMQ-4159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stirling Chow updated AMQ-4159: ------------------------------- Summary: Race condition in SimpleDiscoveryAgent creates multiple concurrent threads attempting to connect to the same bridge --- can result in deadlock (was: Lack of thread-safety in SimpleDiscoveryAgent can cause multiple concurrent attempts to establish bridge, which can result in permanent bridge failure) > Race condition in SimpleDiscoveryAgent creates multiple concurrent threads attempting to connect to the same bridge --- can result in deadlock > ---------------------------------------------------------------------------------------------------------------------------------------------- > > Key: AMQ-4159 > URL: https://issues.apache.org/jira/browse/AMQ-4159 > Project: ActiveMQ > Issue Type: Bug > Affects Versions: 5.8.0 > Reporter: Stirling Chow > > Symptom > ======= > I was diagnosing a deadlock issue in {{DiscoveryNetworkConnector}} and noticed that during one of the tests, concurrent calls were being made to {{DiscoveryNetworkConnector.onServiceAdd}} for the same {{DiscoveryEvent}}. This was unexpected because only a single service (URL) had been given to {{SimpleDiscoveryAgent}}. In fact, during one of the tests I observed dozens of concurrent calls. > Concurrent attempts to establish a bridge to the *same* remote broker are problematic because they expose a number of race conditions in {{DiscoveryNetworkConnector}} and {{RegionBroker}} that can lead to permanent bridge failure, as well as unnecessary thread pool execution/resource usage and logging. > The race conditions will be filed as separate issues. This issue specifically addresses the bug that causes {{SimpleDiscoveryAgent}} to uncontrollably multiply bridge connection attempts. > Cause > ===== > When {{DemandForwardingBridgeSupport}} handles exceptions from either the local or remote sides of the the bridge, it fires a "bridge failed" event: > {code:title=DemandForwardingBridgeSupport.java} > public void serviceLocalException(Throwable error) { > if (!disposed.get()) { > LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error); > LOG.debug("The local Exception was:" + error, error); > brokerService.getTaskRunnerFactory().execute(new Runnable() { > public void run() { > ServiceSupport.dispose(getControllingService()); > } > }); > fireBridgeFailed(); > } > } > public void serviceRemoteException(Throwable error) { > if (!disposed.get()) { > if (error instanceof SecurityException || error instanceof GeneralSecurityException) { > LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); > } else { > LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); > } > LOG.debug("The remote Exception was: " + error, error); > brokerService.getTaskRunnerFactory().execute(new Runnable() { > public void run() { > ServiceSupport.dispose(getControllingService()); > } > }); > fireBridgeFailed(); > } > } > private void fireBridgeFailed() { > NetworkBridgeListener l = this.networkBridgeListener; > if (l != null) { > l.bridgeFailed(); > } > } > {code} > {{DiscoveryNetworkConnector}} is the {{NetworkBridgeListener}}, and its {{bridgeFailed}} method calls back to {{SimpleDiscoveryAgent.serviceFailed(...)}}: > {code:title=DiscoveryNetworkConnectol.java} > protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) { > class DiscoverNetworkBridgeListener extends MBeanNetworkListener { > public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName connectorName) { > super(brokerService, connectorName); > } > public void bridgeFailed() { > if (!serviceSupport.isStopped()) { > try { > discoveryAgent.serviceFailed(event); > } catch (IOException e) { > } > } > } > } > ... > {code} > In response, {{SimpleDiscoveryAgent.serviceFailed(...)}} pauses for the {{reconnectDelay}} before attempting to re-establish the bridge via {{DiscoveryNetworkConnector.onServiceAdd(...)}}: > {code:title=SimpleDiscoveryAgent.java} > public void serviceFailed(DiscoveryEvent devent) throws IOException { > final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent; > if (sevent.failed.compareAndSet(false, true)) { > listener.onServiceRemove(sevent); > taskRunner.execute(new Runnable() { > public void run() { > // We detect a failed connection attempt because the service > // fails right > // away. > if (event.connectTime + minConnectTime > System.currentTimeMillis()) { > LOG.debug("Failure occurred soon after the discovery event was generated. It will be classified as a connection failure: "+event); > ... > synchronized (sleepMutex) { > try { > if (!running.get()) { > LOG.debug("Reconnecting disabled: stopped"); > return; > } > LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect."); > sleepMutex.wait(event.reconnectDelay); > } catch (InterruptedException ie) { > LOG.debug("Reconnecting disabled: " + ie); > Thread.currentThread().interrupt(); > return; > } > } > ... > event.connectTime = System.currentTimeMillis(); > event.failed.set(false); > listener.onServiceAdd(event); > } > }, "Simple Discovery Agent"); > } > } > {code} > *NOTE*: the call to {{listener.onServiceAdd(...)}} is made by a new thread! > There are two race conditions that allow {{SimpleDiscoveryAgent.serviceFailed(...)}} to launch more than one thread, each attempting to re-restablish the same bridge. > First, note that {{DemandForwardingBridgeSupport.serviceLocal/RemoteException(...)} launches a separate thread that stops the bridge: > {code:title=DemandForwardingBridgeSupport.java} > public void serviceRemoteException(Throwable error) { > if (!disposed.get()) { > if (error instanceof SecurityException || error instanceof GeneralSecurityException) { > LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); > } else { > LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error); > } > LOG.debug("The remote Exception was: " + error, error); > brokerService.getTaskRunnerFactory().execute(new Runnable() { > public void run() { > ServiceSupport.dispose(getControllingService()); > } > }); > fireBridgeFailed(); > } > } > public void stop() throws Exception { > if (started.compareAndSet(true, false)) { > if (disposed.compareAndSet(false, true)) { > LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName); > NetworkBridgeListener l = this.networkBridgeListener; > if (l != null) { > l.onStop(this); > } > {code} > When the bridge stops, the {{disposed}} flag is set, which prevents subsequent calls to {{serviceLocalException/serviceRemoteException(...)}} from calling {{fireBridgeFailed()}}. However, the call to {{DemandForwardingBridgeSupport.stop()}} is made by a separate thread, so it is possible for multiple {{serviceLocalException/serviceRemoteException}} calls that are made in quick succession to generate multiple calls to {{fireBridgeFailed()}}. > This is the first race condition: multiple calls can be made to {{DiscoveryNetworkConnector.bridgeFailed()}} for the same bridge. By transitivity, this results in multiple calls to {{SimpleDiscoveryAgent.serviceFailed(...)}}. > {{SimpleDiscoveryAgent.serviceFailed(...)}} has a guard class {{event.failed.compareAndSet(false, true)}} should only allow the first call to launch a bridge reconnect thread. However, once the {{reconnectDelay}} expires, {{event.failed}} is reset to {{false}}, which allows re-entry to the failure handling logic, and the possibility launching of additioning bridge reconnect threads if the {{reconnectDelay}} is short or the threads calling {{serviceFailed}} are delayed. > This is the second race condition: the guard clause in {{SimpleDiscoveryAgent.serviceFailed(...)}} can be reset before the subsequent redundant calls have been filtered out. > These two race conditions allow a single call to {{DiscoveryNetworkConnector.onServiceAdd(...)}} to result in multiple subsequent concurrent (re)calls, and these concurrent calls can spawn their own multiple concurrent calls. The result can be an unlimited number of concurrent calls to {{onServiceAdd(...)}}. > Unit Test > ========= > The attached unit test demonstrates this bug by simulating a bridge failure that has yet to be detected by the remote broker (i.e., before the {{InactivityMonitor}} closes the connection). The local broker attempts to re-establish the bridge, but its call to {{DemandForwardingBridge.startRemoteBroker()}} fails because the remote broker rejects the new connection since the old one still exists. Since {{startRemoteBroker}} sends multiple messages to the remote broker, multiple exceptions are generated: > {code:title=DemandForwardingBridgeSupport.java} > protected void startRemoteBridge() throws Exception { > ... > remoteBroker.oneway(brokerInfo); > ... > remoteBroker.oneway(remoteConnectionInfo); > ... > remoteBroker.oneway(producerInfo); > ... > remoteBroker.oneway(demandConsumerInfo); > } > {code} > The multiple exceptions result in multiple calls to {{DemandForwardingBridgeSupport.serviceRemoteException(...)}}, which allows the first race condition to be exhibited. > The first unit test has a 1s {{reconnectDelay}}, which is sufficient to make teh second race condition improbable; therefore, this test generally passes. > The second unit test has a 0s {[reconnectDelay}; on my system, this makes the timing of multiple calls to {{DemandForwardingBridgeSupport.serviceRemoteException(...)}} such that the second race condition is exhibited, resulting in the unit test failing because it detects concurrent calls to {{DiscoveryNetworkConnector.onServiceAdd(...)}}. > Solution > ======== > While it would be possible to add a {{failed.compareAndSet(false,true))}} guard clause to {{DemandForwardingBridgeSupport.fireBridgeFailed()}}, and prevent the first race condition from allowing multiple calls to {{SimpleDiscoveryAgent.serviceFailed()}}, the root problem is the race condition in {{serviceFailed}}. This can be trivially addressed by making a copy of the {{DiscoveryEvent}}, which prevents the original {{event.failed}} guard clause from being reset: > {code:title=Patched SimpleDiscoveryAgent.java} > public void serviceFailed(DiscoveryEvent devent) throws IOException { > final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent; > if (sevent.failed.compareAndSet(false, true)) { > listener.onServiceRemove(sevent); > taskRunner.execute(new Runnable() { > public void run() { > SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent); > ... > event.connectTime = System.currentTimeMillis(); > event.failed.set(false); > listener.onServiceAdd(event); > } > }, "Simple Discovery Agent"); > } > } > {code} -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira