camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nimruhil <nimru...@yandex.ru>
Subject The most proper way to isolate statefull beans during concurrent exchanges
Date Wed, 24 Sep 2014 11:41:36 GMT
Let's say there is a route from which a statefull bean is invoked:

<camel:route id="Concurrently-called-route">
    <camel:from uri="direct:concurrentlyCalledRoute"/>
    <camel:bean ref="statefullBean" method="setSomeState"/>
    <camel:bean ref="statefullBean" method="getSomeDataDependingOnState"/>
</camel:route>

Messages could be sent along this route concurrently, i.e. requestBody method of ProducerTemplate
is called from concurrent threads. So a problem would arise if two excahnges were going and
setSomeState was called during one exchange in between of calls to setSomeState and getSomeDataDependingOnState
performed during another exchange. I see two ways to solve this problem each of which has
a drawback.


Using SEDA

<camel:route id="Councurrently-called-route">
    <camel:from uri="direct:concurrentlyCalledRoute"/>
    <camel:to uri="seda:sedaRoute"/>
</camel:route>

<camel:route id="SEDA-route">
    <camel:from uri="seda:sedaRoute"/>
    <camel:bean ref="statefullBean" method="setSomeState"/>
    <camel:bean ref="statefullBean" method="getSomeDataDependingOnState"/>
</camel:route>

In this case messages sent from different threads gather in a queue of the SEDA endpoint.
Messages from this queue is handled in one thread while going along SEDA-route. So processing
of a message won't interfere processing of another one. However, if there were many threads
sending messages to concurrentlyCalledRoute SEDA-route would be a bottleneck. If more than
one thread were used to handle items from the seda queue the problem with concurrent calls
to statefull beans would arise again.

Another way - use custom scope.


Custom scope

Spring Framework allows to implement custom scopes. So we are able to implement a scope which
will store a separate instance of a bean for each excahange.

public class ExchangeScope implements Scope {

    private Map<String, Map<String,Object>> instances = new ConcurrentHashMap<>();

    private Map<String,Runnable> destructionCallbacks = new ConcurrentHashMap<>();

    private final ThreadLocal<String> currentExchangeId = new ThreadLocal<>();

    public void activate(String exchangeId) {
        if (!this.instances.containsKey(exchangeId)) {
            Map<String, Object> instancesInCurrentExchangeScope = new ConcurrentHashMap<>();
            this.instances.put(exchangeId, instancesInCurrentExchangeScope);
        }
        this.currentExchangeId.set(exchangeId);
    }

    public void destroy() {
        String currentExchangeId = this.currentExchangeId.get();
        Map<String,Object> instancesInCurrentExchangeScope = instances.get(currentExchangeId);
        if (instancesInCurrentExchangeScope == null)
            throw new RuntimeException("ExchangeScope with id = " + currentExchangeId + "
doesn't exist");
        for (String name : instancesInCurrentExchangeScope.keySet()) {
            this.remove(name);
        }
        instances.remove(currentExchangeId);
        this.currentExchangeId.set(null);
    }

    @Override
    public Object get(String name, ObjectFactory<?> objectFactory) {
    // selects by name a bean instance from a map storing instances for current exchange
    // creates a new bean instance if necessary
    }

    @Override
    public Object remove(String name) {
    // removes a bean instance
    }

    @Override
    public void registerDestructionCallback(String name, Runnable callback) {
        this.destructionCallbacks.put(name, callback);
    }

    @Override
    public Object resolveContextualObject(String name) {
        String currentExchangeId = this.currentExchangeId.get();
        if (currentExchangeId == null)
            return null;

        Map<String,Object> instancesInCurrentExchangeScope = this.instances.get(currentExchangeId);
        if (instancesInCurrentExchangeScope == null)
            return null;

        return instancesInCurrentExchangeScope.get(name);
    }

    @Override
    public String getConversationId() {
        return this.currentExchangeId.get();
    }
}

Now we can register this custom scope and declare statefullBean as exchange scoped:

<bean id="exchangeScope" class="org.my.ExchangeScope"/>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="ExchangeScope" value-ref="exchangeScope"/>
        </map>
    </property>
</bean>

<bean id="statefullBean" class="org.my.StatefullBean" scope="ExchangeScope"/>

To use exchange scope we should call activate method of ExchangeScope before sending a message
and call destroy after that:

exchangeScope.activate(exchangeId);
producerTemplate.requestBody(request);
exchangeScope.destroy(exchangeId);

With this implementation a exchange scope is actually a thread scope. And this is a drawback.
If for example multithreaded splitter was used in a route it would be unable to call exchange
scoped beans from threads created by the splitter because calls to beans would be performed
in threads different from a thread an exchange is started in.

Any ideas how to work around these drawbacks? Are there completelly different ways to isolate
statefull beans during concurrent exchanges?

Mime
View raw message