activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stirling Chow (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AMQ-4159) Race condition in SimpleDiscoveryAgent creates multiple concurrent threads attempting to connect to the same bridge --- can result in deadlock
Date Sat, 03 Nov 2012 23:35:16 GMT

     [ https://issues.apache.org/jira/browse/AMQ-4159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Stirling Chow updated AMQ-4159:
-------------------------------

    Description: 
Symptom
=======
I was diagnosing a deadlock issue in {{DiscoveryNetworkConnector}} and noticed that during
one of the tests, concurrent calls were being made to {{DiscoveryNetworkConnector.onServiceAdd(...)}}
for the same {{DiscoveryEvent}}.  This was unexpected because only a single service (URL)
had been given to {{SimpleDiscoveryAgent}}.  In fact, during one of the tests I observed dozens
of concurrent calls.

Concurrent attempts to establish a bridge to the *same* remote broker are problematic because
they expose a number of race conditions in {{DiscoveryNetworkConnector}} and {{RegionBroker}}
that can lead to permanent bridge failure, as well as unnecessary thread pool execution/resource
usage and logging.

The issues with {{DiscoveryNetworkConnector}} and {{RegionBroker}} will be filed as separate
issues.  This issue specifically addresses the bug that causes {{SimpleDiscoveryAgent}} to
uncontrollably multiply bridge connection attempts.

Cause
=====
When {{DemandForwardingBridgeSupport}} handles exceptions from either the local or remote
sides of the the bridge, it fires a "bridge failed" event:

{code:title=DemandForwardingBridgeSupport.java}
public void serviceLocalException(Throwable error) {
    if (!disposed.get()) {
        LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "
shutdown due to a local error: " + error);
        LOG.debug("The local Exception was:" + error, error);
        brokerService.getTaskRunnerFactory().execute(new Runnable() {
            public void run() {
                ServiceSupport.dispose(getControllingService());
            }
        });
        fireBridgeFailed();
    }
}


public void serviceRemoteException(Throwable error) {
    if (!disposed.get()) {
        if (error instanceof SecurityException || error instanceof GeneralSecurityException)
{
            LOG.error("Network connection between " + localBroker + " and " + remoteBroker
+ " shutdown due to a remote error: " + error);
        } else {
            LOG.warn("Network connection between " + localBroker + " and " + remoteBroker
+ " shutdown due to a remote error: " + error);
        }
        LOG.debug("The remote Exception was: " + error, error);
        brokerService.getTaskRunnerFactory().execute(new Runnable() {
            public void run() {
                ServiceSupport.dispose(getControllingService());
            }
        });
        fireBridgeFailed();
    }
}

private void fireBridgeFailed() {
    NetworkBridgeListener l = this.networkBridgeListener;
    if (l != null) {
        l.bridgeFailed();
    }
}
{code}

{{DiscoveryNetworkConnector}} is the {{NetworkBridgeListener}}, and its {{bridgeFailed()}}
method calls back to {{SimpleDiscoveryAgent.serviceFailed(...)}}:

{code:title=DiscoveryNetworkConnectol.java}
protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport,
final DiscoveryEvent event) {
    class DiscoverNetworkBridgeListener extends MBeanNetworkListener {

        public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName connectorName)
{
            super(brokerService, connectorName);
        }

        public void bridgeFailed() {
            if (!serviceSupport.isStopped()) {
                try {
                    discoveryAgent.serviceFailed(event);
                } catch (IOException e) {
                }
            }

        }
    }
...
{code}

In response, {{SimpleDiscoveryAgent.serviceFailed(...)}} pauses for the {{reconnectDelay}}
before attempting to re-establish the bridge via {{DiscoveryNetworkConnector.onServiceAdd(...)}}:

{code:title=SimpleDiscoveryAgent.java}
public void serviceFailed(DiscoveryEvent devent) throws IOException {

    final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
    if (sevent.failed.compareAndSet(false, true)) {

        listener.onServiceRemove(sevent);
        taskRunner.execute(new Runnable() {
            public void run() {
                // We detect a failed connection attempt because the service
                // fails right
                // away.
                if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
                    LOG.debug("Failure occurred soon after the discovery event was generated.
 It will be classified as a connection failure: "+event);
...
                    synchronized (sleepMutex) {
                        try {
                            if (!running.get()) {
                                LOG.debug("Reconnecting disabled: stopped");
                                return;
                            }

                            LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting
to reconnect.");
                            sleepMutex.wait(event.reconnectDelay);
                        } catch (InterruptedException ie) {
                            LOG.debug("Reconnecting disabled: " + ie);
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
...
                event.connectTime = System.currentTimeMillis();
                event.failed.set(false);
                listener.onServiceAdd(event);
            }
        }, "Simple Discovery Agent");
    }
}
{code}

*NOTE*: the call to {{listener.onServiceAdd(...)}} is made by a new thread!

There are two race conditions that allow {{SimpleDiscoveryAgent.serviceFailed(...)}} to launch
more than one thread, each attempting to re-restablish the same bridge.

First, note that {{DemandForwardingBridgeSupport.serviceLocal/RemoteException(...)}} launches
a separate thread that stops the bridge:

{code:title=DemandForwardingBridgeSupport.java}
public void serviceRemoteException(Throwable error) {
    if (!disposed.get()) {
        if (error instanceof SecurityException || error instanceof GeneralSecurityException)
{
            LOG.error("Network connection between " + localBroker + " and " + remoteBroker
+ " shutdown due to a remote error: " + error);
        } else {
            LOG.warn("Network connection between " + localBroker + " and " + remoteBroker
+ " shutdown due to a remote error: " + error);
        } 
        LOG.debug("The remote Exception was: " + error, error);
        brokerService.getTaskRunnerFactory().execute(new Runnable() {
            public void run() {
                ServiceSupport.dispose(getControllingService());
            }
        });
        fireBridgeFailed();
    }
}

public void stop() throws Exception {
    if (started.compareAndSet(true, false)) {
        if (disposed.compareAndSet(false, true)) {
            LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
            NetworkBridgeListener l = this.networkBridgeListener;
            if (l != null) {
                l.onStop(this);
            }
{code}

When the bridge stops, the {{disposed}} flag is set, which prevents subsequent calls to {{serviceLocal/RemoteException(...)}}
from calling {{fireBridgeFailed()}}.  However, since the call to {{DemandForwardingBridgeSupport.stop()}}
is made by a separate thread, multiple {{serviceLocal/RemoteException(...)}} calls that are
made in quick succession can result in multiple calls to {{fireBridgeFailed()}}.

This is the first race condition: multiple calls can be made to {{DiscoveryNetworkConnector.bridgeFailed()}}
for the same bridge.  By transitivity, this results in multiple calls to {{SimpleDiscoveryAgent.serviceFailed(...)}}.

{{SimpleDiscoveryAgent.serviceFailed(...)}} has a guard class {{event.failed.compareAndSet(false,
true)}} should only allow the first call to launch a bridge reconnect thread.  However, once
the {{reconnectDelay}} expires, {{event.failed}} is reset to {{false}}, which allows re-entry
to the failure handling logic, and the possibility launching of additioning bridge reconnect
threads if the {{reconnectDelay}} is short or the threads calling {{serviceFailed}} are delayed.

This is the second race condition: the guard clause in {{SimpleDiscoveryAgent.serviceFailed(...)}}
can be reset before the subsequent redundant calls have been filtered out.

These two race conditions allow a single call to {{DiscoveryNetworkConnector.onServiceAdd(...)}}
to result in multiple subsequent concurrent (re)calls, and these concurrent calls can spawn
their own multiple concurrent calls.  The result can be an unlimited number of concurrent
calls to {{onServiceAdd(...)}}.

Unit Test
=========
The attached unit test demonstrates this bug by simulating a bridge failure that has yet to
be detected by the remote broker (i.e., before the {{InactivityMonitor}} closes the connection).
 The local broker attempts to re-establish the bridge, but its call to {{DemandForwardingBridge.startRemoteBroker()}}
fails because the remote broker rejects the new connection since the old one still exists.
 Since {{startRemoteBroker}} sends multiple messages to the remote broker, multiple exceptions
are generated:

{code:title=DemandForwardingBridgeSupport.java}
protected void startRemoteBridge() throws Exception {
...
                remoteBroker.oneway(brokerInfo);
...
            remoteBroker.oneway(remoteConnectionInfo);
...
            remoteBroker.oneway(producerInfo);
...
                remoteBroker.oneway(demandConsumerInfo);
}
{code}

The multiple exceptions result in multiple calls to {{DemandForwardingBridgeSupport.serviceRemoteException(...)}},
which allows the first race condition to be exhibited.  

The first unit test has a 1s {{reconnectDelay}}, which is sufficient to make teh second race
condition improbable; therefore, this test generally passes. 

The second unit test has a 0s {[reconnectDelay}; on my system, this makes the timing of multiple
calls to {{DemandForwardingBridgeSupport.serviceRemoteException(...)}} such that the second
race condition is exhibited, resulting in the unit test failing because it detects concurrent
calls to {{DiscoveryNetworkConnector.onServiceAdd(...)}}.

Solution
========
While it would be possible to add a {{failed.compareAndSet(false,true))}} guard clause to
{{DemandForwardingBridgeSupport.fireBridgeFailed()}}, and prevent the first race condition
from allowing multiple calls to {{SimpleDiscoveryAgent.serviceFailed()}}, the root problem
is the race condition in {{serviceFailed}}.  This can be trivially addressed by making a copy
of the {{DiscoveryEvent}}, which prevents the original {{event.failed}} guard clause from
being reset:

{code:title=Patched SimpleDiscoveryAgent.java}
public void serviceFailed(DiscoveryEvent devent) throws IOException {

    final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent;
    if (sevent.failed.compareAndSet(false, true)) {

        listener.onServiceRemove(sevent);
        taskRunner.execute(new Runnable() {
            public void run() {
                SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent);

...
                event.connectTime = System.currentTimeMillis();
                event.failed.set(false);
                listener.onServiceAdd(event);
            }
        }, "Simple Discovery Agent");
    }
}
{code} 

  was:
Symptom
=======
I was diagnosing a deadlock issue in {{DiscoveryNetworkConnector}} and noticed that during
one of the tests, concurrent calls were being made to {{DiscoveryNetworkConnector.onServiceAdd(...)}}
for the same {{DiscoveryEvent}}.  This was unexpected because only a single service (URL)
had been given to {{SimpleDiscoveryAgent}}.  In fact, during one of the tests I observed dozens
of concurrent calls.

Concurrent attempts to establish a bridge to the *same* remote broker are problematic because
they expose a number of race conditions in {{DiscoveryNetworkConnector}} and {{RegionBroker}}
that can lead to permanent bridge failure, as well as unnecessary thread pool execution/resource
usage and logging.

The issues with {{DiscoveryNetworkConnector}} and {{RegionBroker}} will be filed as separate
issues.  This issue specifically addresses the bug that causes {{SimpleDiscoveryAgent}} to
uncontrollably multiply bridge connection attempts.

Cause
=====
When {{DemandForwardingBridgeSupport}} handles exceptions from either the local or remote
sides of the the bridge, it fires a "bridge failed" event:

{code:title=DemandForwardingBridgeSupport.java}
public void serviceLocalException(Throwable error) {
    if (!disposed.get()) {
        LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "
shutdown due to a local error: " + error);
        LOG.debug("The local Exception was:" + error, error);
        brokerService.getTaskRunnerFactory().execute(new Runnable() {
            public void run() {
                ServiceSupport.dispose(getControllingService());
            }
        });
        fireBridgeFailed();
    }
}


public void serviceRemoteException(Throwable error) {
    if (!disposed.get()) {
        if (error instanceof SecurityException || error instanceof GeneralSecurityException)
{
            LOG.error("Network connection between " + localBroker + " and " + remoteBroker
+ " shutdown due to a remote error: " + error);
        } else {
            LOG.warn("Network connection between " + localBroker + " and " + remoteBroker
+ " shutdown due to a remote error: " + error);
        }
        LOG.debug("The remote Exception was: " + error, error);
        brokerService.getTaskRunnerFactory().execute(new Runnable() {
            public void run() {
                ServiceSupport.dispose(getControllingService());
            }
        });
        fireBridgeFailed();
    }
}

private void fireBridgeFailed() {
    NetworkBridgeListener l = this.networkBridgeListener;
    if (l != null) {
        l.bridgeFailed();
    }
}
{code}

{{DiscoveryNetworkConnector}} is the {{NetworkBridgeListener}}, and its {{bridgeFailed()}}
method calls back to {{SimpleDiscoveryAgent.serviceFailed(...)}}:

{code:title=DiscoveryNetworkConnectol.java}
protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport,
final DiscoveryEvent event) {
    class DiscoverNetworkBridgeListener extends MBeanNetworkListener {

        public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName connectorName)
{
            super(brokerService, connectorName);
        }

        public void bridgeFailed() {
            if (!serviceSupport.isStopped()) {
                try {
                    discoveryAgent.serviceFailed(event);
                } catch (IOException e) {
                }
            }

        }
    }
...
{code}

In response, {{SimpleDiscoveryAgent.serviceFailed(...)}} pauses for the {{reconnectDelay}}
before attempting to re-establish the bridge via {{DiscoveryNetworkConnector.onServiceAdd(...)}}:

{code:title=SimpleDiscoveryAgent.java}
public void serviceFailed(DiscoveryEvent devent) throws IOException {

    final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
    if (sevent.failed.compareAndSet(false, true)) {

        listener.onServiceRemove(sevent);
        taskRunner.execute(new Runnable() {
            public void run() {
                // We detect a failed connection attempt because the service
                // fails right
                // away.
                if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
                    LOG.debug("Failure occurred soon after the discovery event was generated.
 It will be classified as a connection failure: "+event);
...
                    synchronized (sleepMutex) {
                        try {
                            if (!running.get()) {
                                LOG.debug("Reconnecting disabled: stopped");
                                return;
                            }

                            LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting
to reconnect.");
                            sleepMutex.wait(event.reconnectDelay);
                        } catch (InterruptedException ie) {
                            LOG.debug("Reconnecting disabled: " + ie);
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
...
                event.connectTime = System.currentTimeMillis();
                event.failed.set(false);
                listener.onServiceAdd(event);
            }
        }, "Simple Discovery Agent");
    }
}
{code}

*NOTE*: the call to {{listener.onServiceAdd(...)}} is made by a new thread!

There are two race conditions that allow {{SimpleDiscoveryAgent.serviceFailed(...)}} to launch
more than one thread, each attempting to re-restablish the same bridge.

First, note that {{DemandForwardingBridgeSupport.serviceLocal/RemoteException(...)}} launches
a separate thread that stops the bridge:

{code:title=DemandForwardingBridgeSupport.java}
public void serviceRemoteException(Throwable error) {
    if (!disposed.get()) {
        if (error instanceof SecurityException || error instanceof GeneralSecurityException)
{
            LOG.error("Network connection between " + localBroker + " and " + remoteBroker
+ " shutdown due to a remote error: " + error);
        } else {
            LOG.warn("Network connection between " + localBroker + " and " + remoteBroker
+ " shutdown due to a remote error: " + error);
        } 
        LOG.debug("The remote Exception was: " + error, error);
        brokerService.getTaskRunnerFactory().execute(new Runnable() {
            public void run() {
                ServiceSupport.dispose(getControllingService());
            }
        });
        fireBridgeFailed();
    }
}

public void stop() throws Exception {
    if (started.compareAndSet(true, false)) {
        if (disposed.compareAndSet(false, true)) {
            LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
            NetworkBridgeListener l = this.networkBridgeListener;
            if (l != null) {
                l.onStop(this);
            }
{code}

When the bridge stops, the {{disposed}} flag is set, which prevents subsequent calls to {{serviceLocal/RemoteException(...)}}
from calling {{fireBridgeFailed()}}.  However, the call to {{DemandForwardingBridgeSupport.stop()}}
is made by a separate thread, so it is possible for multiple {{serviceLocalException/serviceRemoteException}}
calls that are made in quick succession to generate multiple calls to {{fireBridgeFailed()}}.

This is the first race condition: multiple calls can be made to {{DiscoveryNetworkConnector.bridgeFailed()}}
for the same bridge.  By transitivity, this results in multiple calls to {{SimpleDiscoveryAgent.serviceFailed(...)}}.

{{SimpleDiscoveryAgent.serviceFailed(...)}} has a guard class {{event.failed.compareAndSet(false,
true)}} should only allow the first call to launch a bridge reconnect thread.  However, once
the {{reconnectDelay}} expires, {{event.failed}} is reset to {{false}}, which allows re-entry
to the failure handling logic, and the possibility launching of additioning bridge reconnect
threads if the {{reconnectDelay}} is short or the threads calling {{serviceFailed}} are delayed.

This is the second race condition: the guard clause in {{SimpleDiscoveryAgent.serviceFailed(...)}}
can be reset before the subsequent redundant calls have been filtered out.

These two race conditions allow a single call to {{DiscoveryNetworkConnector.onServiceAdd(...)}}
to result in multiple subsequent concurrent (re)calls, and these concurrent calls can spawn
their own multiple concurrent calls.  The result can be an unlimited number of concurrent
calls to {{onServiceAdd(...)}}.

Unit Test
=========
The attached unit test demonstrates this bug by simulating a bridge failure that has yet to
be detected by the remote broker (i.e., before the {{InactivityMonitor}} closes the connection).
 The local broker attempts to re-establish the bridge, but its call to {{DemandForwardingBridge.startRemoteBroker()}}
fails because the remote broker rejects the new connection since the old one still exists.
 Since {{startRemoteBroker}} sends multiple messages to the remote broker, multiple exceptions
are generated:

{code:title=DemandForwardingBridgeSupport.java}
protected void startRemoteBridge() throws Exception {
...
                remoteBroker.oneway(brokerInfo);
...
            remoteBroker.oneway(remoteConnectionInfo);
...
            remoteBroker.oneway(producerInfo);
...
                remoteBroker.oneway(demandConsumerInfo);
}
{code}

The multiple exceptions result in multiple calls to {{DemandForwardingBridgeSupport.serviceRemoteException(...)}},
which allows the first race condition to be exhibited.  

The first unit test has a 1s {{reconnectDelay}}, which is sufficient to make teh second race
condition improbable; therefore, this test generally passes. 

The second unit test has a 0s {[reconnectDelay}; on my system, this makes the timing of multiple
calls to {{DemandForwardingBridgeSupport.serviceRemoteException(...)}} such that the second
race condition is exhibited, resulting in the unit test failing because it detects concurrent
calls to {{DiscoveryNetworkConnector.onServiceAdd(...)}}.

Solution
========
While it would be possible to add a {{failed.compareAndSet(false,true))}} guard clause to
{{DemandForwardingBridgeSupport.fireBridgeFailed()}}, and prevent the first race condition
from allowing multiple calls to {{SimpleDiscoveryAgent.serviceFailed()}}, the root problem
is the race condition in {{serviceFailed}}.  This can be trivially addressed by making a copy
of the {{DiscoveryEvent}}, which prevents the original {{event.failed}} guard clause from
being reset:

{code:title=Patched SimpleDiscoveryAgent.java}
public void serviceFailed(DiscoveryEvent devent) throws IOException {

    final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent;
    if (sevent.failed.compareAndSet(false, true)) {

        listener.onServiceRemove(sevent);
        taskRunner.execute(new Runnable() {
            public void run() {
                SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent);

...
                event.connectTime = System.currentTimeMillis();
                event.failed.set(false);
                listener.onServiceAdd(event);
            }
        }, "Simple Discovery Agent");
    }
}
{code} 

    
> Race condition in SimpleDiscoveryAgent creates multiple concurrent threads attempting
to connect to the same bridge --- can result in deadlock
> ----------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4159
>                 URL: https://issues.apache.org/jira/browse/AMQ-4159
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.8.0
>            Reporter: Stirling Chow
>
> Symptom
> =======
> I was diagnosing a deadlock issue in {{DiscoveryNetworkConnector}} and noticed that during
one of the tests, concurrent calls were being made to {{DiscoveryNetworkConnector.onServiceAdd(...)}}
for the same {{DiscoveryEvent}}.  This was unexpected because only a single service (URL)
had been given to {{SimpleDiscoveryAgent}}.  In fact, during one of the tests I observed dozens
of concurrent calls.
> Concurrent attempts to establish a bridge to the *same* remote broker are problematic
because they expose a number of race conditions in {{DiscoveryNetworkConnector}} and {{RegionBroker}}
that can lead to permanent bridge failure, as well as unnecessary thread pool execution/resource
usage and logging.
> The issues with {{DiscoveryNetworkConnector}} and {{RegionBroker}} will be filed as separate
issues.  This issue specifically addresses the bug that causes {{SimpleDiscoveryAgent}} to
uncontrollably multiply bridge connection attempts.
> Cause
> =====
> When {{DemandForwardingBridgeSupport}} handles exceptions from either the local or remote
sides of the the bridge, it fires a "bridge failed" event:
> {code:title=DemandForwardingBridgeSupport.java}
> public void serviceLocalException(Throwable error) {
>     if (!disposed.get()) {
>         LOG.info("Network connection between " + localBroker + " and " + remoteBroker
+ " shutdown due to a local error: " + error);
>         LOG.debug("The local Exception was:" + error, error);
>         brokerService.getTaskRunnerFactory().execute(new Runnable() {
>             public void run() {
>                 ServiceSupport.dispose(getControllingService());
>             }
>         });
>         fireBridgeFailed();
>     }
> }
> public void serviceRemoteException(Throwable error) {
>     if (!disposed.get()) {
>         if (error instanceof SecurityException || error instanceof GeneralSecurityException)
{
>             LOG.error("Network connection between " + localBroker + " and " + remoteBroker
+ " shutdown due to a remote error: " + error);
>         } else {
>             LOG.warn("Network connection between " + localBroker + " and " + remoteBroker
+ " shutdown due to a remote error: " + error);
>         }
>         LOG.debug("The remote Exception was: " + error, error);
>         brokerService.getTaskRunnerFactory().execute(new Runnable() {
>             public void run() {
>                 ServiceSupport.dispose(getControllingService());
>             }
>         });
>         fireBridgeFailed();
>     }
> }
> private void fireBridgeFailed() {
>     NetworkBridgeListener l = this.networkBridgeListener;
>     if (l != null) {
>         l.bridgeFailed();
>     }
> }
> {code}
> {{DiscoveryNetworkConnector}} is the {{NetworkBridgeListener}}, and its {{bridgeFailed()}}
method calls back to {{SimpleDiscoveryAgent.serviceFailed(...)}}:
> {code:title=DiscoveryNetworkConnectol.java}
> protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport,
final DiscoveryEvent event) {
>     class DiscoverNetworkBridgeListener extends MBeanNetworkListener {
>         public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName
connectorName) {
>             super(brokerService, connectorName);
>         }
>         public void bridgeFailed() {
>             if (!serviceSupport.isStopped()) {
>                 try {
>                     discoveryAgent.serviceFailed(event);
>                 } catch (IOException e) {
>                 }
>             }
>         }
>     }
> ...
> {code}
> In response, {{SimpleDiscoveryAgent.serviceFailed(...)}} pauses for the {{reconnectDelay}}
before attempting to re-establish the bridge via {{DiscoveryNetworkConnector.onServiceAdd(...)}}:
> {code:title=SimpleDiscoveryAgent.java}
> public void serviceFailed(DiscoveryEvent devent) throws IOException {
>     final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
>     if (sevent.failed.compareAndSet(false, true)) {
>         listener.onServiceRemove(sevent);
>         taskRunner.execute(new Runnable() {
>             public void run() {
>                 // We detect a failed connection attempt because the service
>                 // fails right
>                 // away.
>                 if (event.connectTime + minConnectTime > System.currentTimeMillis())
{
>                     LOG.debug("Failure occurred soon after the discovery event was generated.
 It will be classified as a connection failure: "+event);
> ...
>                     synchronized (sleepMutex) {
>                         try {
>                             if (!running.get()) {
>                                 LOG.debug("Reconnecting disabled: stopped");
>                                 return;
>                             }
>                             LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting
to reconnect.");
>                             sleepMutex.wait(event.reconnectDelay);
>                         } catch (InterruptedException ie) {
>                             LOG.debug("Reconnecting disabled: " + ie);
>                             Thread.currentThread().interrupt();
>                             return;
>                         }
>                     }
> ...
>                 event.connectTime = System.currentTimeMillis();
>                 event.failed.set(false);
>                 listener.onServiceAdd(event);
>             }
>         }, "Simple Discovery Agent");
>     }
> }
> {code}
> *NOTE*: the call to {{listener.onServiceAdd(...)}} is made by a new thread!
> There are two race conditions that allow {{SimpleDiscoveryAgent.serviceFailed(...)}}
to launch more than one thread, each attempting to re-restablish the same bridge.
> First, note that {{DemandForwardingBridgeSupport.serviceLocal/RemoteException(...)}}
launches a separate thread that stops the bridge:
> {code:title=DemandForwardingBridgeSupport.java}
> public void serviceRemoteException(Throwable error) {
>     if (!disposed.get()) {
>         if (error instanceof SecurityException || error instanceof GeneralSecurityException)
{
>             LOG.error("Network connection between " + localBroker + " and " + remoteBroker
+ " shutdown due to a remote error: " + error);
>         } else {
>             LOG.warn("Network connection between " + localBroker + " and " + remoteBroker
+ " shutdown due to a remote error: " + error);
>         } 
>         LOG.debug("The remote Exception was: " + error, error);
>         brokerService.getTaskRunnerFactory().execute(new Runnable() {
>             public void run() {
>                 ServiceSupport.dispose(getControllingService());
>             }
>         });
>         fireBridgeFailed();
>     }
> }
> public void stop() throws Exception {
>     if (started.compareAndSet(true, false)) {
>         if (disposed.compareAndSet(false, true)) {
>             LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " +
remoteBrokerName);
>             NetworkBridgeListener l = this.networkBridgeListener;
>             if (l != null) {
>                 l.onStop(this);
>             }
> {code}
> When the bridge stops, the {{disposed}} flag is set, which prevents subsequent calls
to {{serviceLocal/RemoteException(...)}} from calling {{fireBridgeFailed()}}.  However, since
the call to {{DemandForwardingBridgeSupport.stop()}} is made by a separate thread, multiple
{{serviceLocal/RemoteException(...)}} calls that are made in quick succession can result in
multiple calls to {{fireBridgeFailed()}}.
> This is the first race condition: multiple calls can be made to {{DiscoveryNetworkConnector.bridgeFailed()}}
for the same bridge.  By transitivity, this results in multiple calls to {{SimpleDiscoveryAgent.serviceFailed(...)}}.
> {{SimpleDiscoveryAgent.serviceFailed(...)}} has a guard class {{event.failed.compareAndSet(false,
true)}} should only allow the first call to launch a bridge reconnect thread.  However, once
the {{reconnectDelay}} expires, {{event.failed}} is reset to {{false}}, which allows re-entry
to the failure handling logic, and the possibility launching of additioning bridge reconnect
threads if the {{reconnectDelay}} is short or the threads calling {{serviceFailed}} are delayed.
> This is the second race condition: the guard clause in {{SimpleDiscoveryAgent.serviceFailed(...)}}
can be reset before the subsequent redundant calls have been filtered out.
> These two race conditions allow a single call to {{DiscoveryNetworkConnector.onServiceAdd(...)}}
to result in multiple subsequent concurrent (re)calls, and these concurrent calls can spawn
their own multiple concurrent calls.  The result can be an unlimited number of concurrent
calls to {{onServiceAdd(...)}}.
> Unit Test
> =========
> The attached unit test demonstrates this bug by simulating a bridge failure that has
yet to be detected by the remote broker (i.e., before the {{InactivityMonitor}} closes the
connection).  The local broker attempts to re-establish the bridge, but its call to {{DemandForwardingBridge.startRemoteBroker()}}
fails because the remote broker rejects the new connection since the old one still exists.
 Since {{startRemoteBroker}} sends multiple messages to the remote broker, multiple exceptions
are generated:
> {code:title=DemandForwardingBridgeSupport.java}
> protected void startRemoteBridge() throws Exception {
> ...
>                 remoteBroker.oneway(brokerInfo);
> ...
>             remoteBroker.oneway(remoteConnectionInfo);
> ...
>             remoteBroker.oneway(producerInfo);
> ...
>                 remoteBroker.oneway(demandConsumerInfo);
> }
> {code}
> The multiple exceptions result in multiple calls to {{DemandForwardingBridgeSupport.serviceRemoteException(...)}},
which allows the first race condition to be exhibited.  
> The first unit test has a 1s {{reconnectDelay}}, which is sufficient to make teh second
race condition improbable; therefore, this test generally passes. 
> The second unit test has a 0s {[reconnectDelay}; on my system, this makes the timing
of multiple calls to {{DemandForwardingBridgeSupport.serviceRemoteException(...)}} such that
the second race condition is exhibited, resulting in the unit test failing because it detects
concurrent calls to {{DiscoveryNetworkConnector.onServiceAdd(...)}}.
> Solution
> ========
> While it would be possible to add a {{failed.compareAndSet(false,true))}} guard clause
to {{DemandForwardingBridgeSupport.fireBridgeFailed()}}, and prevent the first race condition
from allowing multiple calls to {{SimpleDiscoveryAgent.serviceFailed()}}, the root problem
is the race condition in {{serviceFailed}}.  This can be trivially addressed by making a copy
of the {{DiscoveryEvent}}, which prevents the original {{event.failed}} guard clause from
being reset:
> {code:title=Patched SimpleDiscoveryAgent.java}
> public void serviceFailed(DiscoveryEvent devent) throws IOException {
>     final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent;
>     if (sevent.failed.compareAndSet(false, true)) {
>         listener.onServiceRemove(sevent);
>         taskRunner.execute(new Runnable() {
>             public void run() {
>                 SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent);
> ...
>                 event.connectTime = System.currentTimeMillis();
>                 event.failed.set(false);
>                 listener.onServiceAdd(event);
>             }
>         }, "Simple Discovery Agent");
>     }
> }
> {code} 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message