avalon-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Leo Simons <leosim...@apache.org>
Subject seperation of concerns applied: Thread Management
Date Wed, 08 Oct 2003 21:10:59 GMT
okay, had an itch to scratch here and some time on my hands. Here's an 
essay:

===========
The Problem
===========
Steve wrote:
 > I think the real underlying issue here is component/container
 > responsibility. Who should be creating and supervising/monitoring the
 > thread - the component or the container?

In a COP environment, a common question is "Which entity is 
responsibility for a particular function?". Its one of the main 
questions we ask when we do system decomposition.

Lets analyze the problem and work towards a neat solution!

Seperation of Concerns
----------------------
The key to solving this problem in a good way is proper seperation of 
concerns. This will allow for a clean and evolvable system, and may even 
provide us with some components that can be reused in another environment.

For example, within avalon, the container is responsible for, at the 
least, component creation, destruction, lifecycle management, dependency 
resolution, configuration, logging, and often management. Components are 
responsible for application logic. This means that the application logic 
in these components is more likely to be reusable in other contexts.

Taking this a little further, a smart container will be built using 
components to which it delegates functionality. There might be a 
component that handles instantiation, another one which handles 
lifecycle management, another one which handles dependency resolution, 
etc. You could build an extensible container this way, where you would 
be able to change facets of its behaviour to make it usable in many 
different problem domains. Yeah!

Back to the problem at hand. What we identify here is several more 
concerns/responsibilities:

1 - thread creation
2 - thread allocation
3 - thread deallocation
4 - thread destruction
5 - thread monitoring
6 - thread-related exception handling

we might make these more fine-grained, or we might group them into a 
single concern dubbed "thread management".

Solution! or not?
-----------------
Okay, got it. Seperate concerns, put system-level concerns in the 
container, specify the relationship between container and component in 
interfaces. Keep components passive (IoC).

A few solutions immediately pop into mind. One of them is to modify 
avalon-framework to specify these concerns, and our container to support 
them. Perhaps something like:

interface ThreadedComponent extends Startable
{
   /**
    * Retrieve the worker. This will then be started in a
    * container-managed thread.
    */
   Runnable getWorker();
   /**
    * Tell the component something bad happened and allow it to recover.
    * After it has, the getWorker() method will be called by the
    * container once more and a new thread will be created.
    */
   void threadProblemOccured( Exception e );
}

this is a bad idea. It requires modification of the core framework 
contracts to support, it requires extensive modification of the 
container, and it does not allow a lot of reuse of thread management 
code across containers and components.

What happened? Well, we got the inversion of control figured out, and we 
seperated concerns (thread creation and management is seperated out from 
the business logic), but it still sucks. But what we forgot was the 
component-oriented programming.

Lets do some IoC/SoC/COP analysis and come up with something better!

========
Analysis
========
The Avalon-Framework contract
-----------------------------
Most components are passive. When you invoke one of their work interface 
methods, they perform a unit of work in the current thread of execution. 
Some components are active. These perform units of work in a seperate 
thread of execution.

In avalon, active components implement the "Startable" interface, and by 
contract they begin their "active" life when the container calls 
start(), and end that life when the container calls stop().

Per the avalon-framework contracts, a component is allowed to deal with 
any of the concerns listed in 1-6 itself in any way it wants. The 
avalon-framework thread management contract is very coarse with regard 
to thread management: it specifies components become active on start() 
and passive on stop().

This is on purpose, and its good, because it allows broad reuse of the 
framework.

It also implies that containers don't concern themselves with any thread 
management concerns.


The Thread-Management contracts
-------------------------------
That is all fine and good, but it seems like a good idea to implement 
all the thread management logic once, and then reuse that for all 
components. Well, sure! That is what COP is all about. Lets identify a 
few contracts (I like code examples, so this is pseudocode):

/** A factory which builds new threads on demand. */
interface ThreadCreator			// concern 1
{
	Thread create( Runnable runnable );
}
/** An 'inverted' factory which destroys threads on demand. */
interface ThreadDestructor		// concern 2
{
	void destroy( Thread thread );
}
/** Provides a thread to a client on demand. */
interface ThreadAllocator		// concern 3
{
	Thread get( Runnable runnable );
}
/** Receives a thread that is no longer required by a client. */
interface ThreadDeallocator		// concern 4
{
	void release( Thread thread );
}
/** Continuously observes a thread and notifies a delegate if anything 
goes wrong with it. */
interface ThreadMonitor			// concern 5
{
	void monitor( Thread thread,
		IllegalThreadStateHandler listener );
}
/** Deals with anything that goes wrong during thread management. */
interface IllegalThreadStateHandler	// concern 6
{
	void handleIllegalThreadState(
		Thread thread, Throwable problem );
}

We've decomposed the "thread management" concern into distinct work 
interfaces. Now, experience shows that this is not particular useful: 
these contracts are too fine-grained. Also, we've not yet exemplified 
the relationships between all these components. What might work better is:

/**
  * Handles thread creation, destruction, allocation, deallocation,
  * monitoring and thread corruption recovery.
  *
  * @avalon.dependency type="ExceptionHandler"
  * @avalon.dependency type="ThreadFactory"
  */
interface ThreadPool
{
	Thread get( Runnable runnable );
	void release( Thread thread );
}
interface ThreadFactory
{
	Thread create( Runnable runnable );
	void destroy( Thread thread );
}
interface ExceptionHandler
{
	void handle( Throwable problem );
}

Now, this is still pretty non-generic. We would like to abstract this 
even further:

/**
  * Handles creation, destruction, allocation, deallocation,
  * monitoring and recovery.
  *
  * @avalon.dependency type="ExceptionHandler"
  * @avalon.dependency type="Factory"
  */
interface Pool
{
	Object get();
	void release( Object object );
}
interface Factory
{
	Object create();
	void destroy( Object object );
}
interface ExceptionHandler
{
	void handle( Throwable problem );
}

this makes the interfaces much more generically reusable, at the cost of 
some classcasts and thus less type safety. For example, we might have 
various pool implementations differing in their management policy, and 
each of these would be capable of managing both Bees and Threads. Also, 
we might have a single global ExceptionHandler that deals with 
everything from out of memory errors to thread corruption (by logging 
them, then calling System.exit(someStatus), perhaps).

However, some annoying person decided that you can't associate and 
re-associate threads with runnables. There are many other ways around 
that, but I'll get to that later.


==============
Implementation
==============
A threaded component
--------------------
Consider a socket server. This is a good example of an active 
server-side component. It might not allow any client components, so its 
work interface could look like:

   /** Listens for socket requests and handles them. */
   interface SocketServer {}

Below is a possible implementation. It uses a single worker thread which 
listens for connections, then delegates those connections to a seperate 
handler. Most actual socket server implementations delegate to a set of 
handlers kept in a pool. A handler looks like:

interface ConnectionHandler	                  // code sketch
{
   public void handle( Socket socket );
}

Here's the socket server implementation:

/** @avalon.component type="SocketServer" */
class ThreadedSocketServer implements SocketServer,
     Servicable, Configurable, Initializable, Startable, Disposable
{
   public final static int DEFAULT_PORT = 80;
   public final static String DEFAULT_ADDRESS = "localhost";
   private int m_port;
   private String m_address;

   private Pool m_threadPool;

   private ServerSocket m_socket;
   private Worker m_worker;
   private thread m_workerThread;
   private ConnectionHandler m_handler;

   public void configure( Configuration conf )
     throws ConfigurationException
   {
     m_port = conf.getChild("port").getValueAsInteger( DEFAULT_PORT );
     m_address = conf.getChild("address").getValue( DEFAULT_ADDRESS );
   }
   /**
    * @avalon.dependency type="Pool"
    * @avalon.dependency type="ConnectionHandler"
    */
   public void service( ServiceManager sm ) throws ServiceException
   {
     m_threadPool = sm.Lookup( Pool.ROLE );
     m_handler = sm.lookup( ConnectionHandler.ROLE );
   }

   public void initialize() throws Exception
   {
     m_socket = getServerSocket();
     m_worker = new Worker();
   }

   public void start() throws Exception
   {
     m_workerThread = (Thread)m_threadPool.get( m_worker );
     m_workerThread.start();
   }

   public void stop()
   {
     m_worker.stop();
     Thread.sleep(100);
     m_workerThread.interrupt();
   }

   public void dispose()
   {
     m_threadPool.release( m_workerThread );
     m_workerThread = null;
   }

   private class Worker implements Runnable
   {
       private boolean running = false;

       public void stop()
       {
         running = false;
       }
       public void run()
       {
         running = true;

         while(running)
         {
           Socket socket = m_socket.accept(); // block
           m_handler.handle( socket ); // delegate

           if(Thread.isInterrupted())
           {
             running = false;
             break; // die
           }
         }
       }
   }
}

Ugly!
-----
There's a substantial amount of thread management code in this server. 
In fact, that's the bulk of the logic. How can we remove the last bits?

===========
Refactoring
===========

The Executor
------------
The socket server is not really concerned with threads; it doesn't care 
whether its worker is run in a unique thread, whether some form of time 
sharing is used, or even if it runs in some version of Java where the 
concept of "thread" doesn't even exist! All it needs is some other 
component which it can ask to make sure that its worker does what it does.

Our minds been clouded by our familiarity with threading. If we step 
back a little here, what we see is that the socket server isn't looking 
for /thread/ management, it is looking for /execution/ management. 
Something like this:

interface Executor
{
   void execute( Runnable runnable );
}

this executor can do thread pooling, thread slicing, or apply whatever 
resource and thread management policies it wants. This allows the socket 
server to be simplified a little more:

/** @avalon.component type="SocketServer" */
class ThreadedSocketServer implements SocketServer,
     Servicable, Configurable, Initializable, Startable
{
   public final static int DEFAULT_PORT = 80;
   public final static String DEFAULT_ADDRESS = "localhost";
   private int m_port;
   private String m_address;

   private Pool m_threadPool;

   private ServerSocket m_socket;
   private Worker m_worker;
   private ConnectionHandler m_handler;

   public void configure( Configuration conf )
     throws ConfigurationException
   {
     m_port = conf.getChild("port").getValueAsInteger( DEFAULT_PORT );
     m_address = conf.getChild("address").getValue( DEFAULT_ADDRESS );
   }
   /**
    * @avalon.dependency type="Executor"
    * @avalon.dependency type="ConnectionHandler"
    */
   public void service( ServiceManager sm ) throws ServiceException
   {
     m_executor = sm.Lookup( Executor.ROLE );
     m_handler = sm.lookup( ConnectionHandler.ROLE );
   }

   public void initialize() throws Exception
   {
     m_socket = getServerSocket();
     m_worker = new Worker();
   }

   public void start() throws Exception
   {
     m_executor.execute( m_worker );
   }

   public void stop()
   {
     m_worker.stop();
   }

   private class Worker implements Runnable
   {
       private boolean running = false;

       public void stop()
       {
         running = false;
       }
       public void run()
       {
         running = true;

         while(running)
         {
           Socket socket = m_socket.accept(); // block
           m_handler.handle( socket ); // delegate

           if(Thread.isInterrupted())
           {
             running = false;
             break; // die
           }
         }
       }
   }
}


Thread Monitoring and Error Management
--------------------------------------
But what if some thread grows stale? There is no way to check whether 
the worker is running, so in the event of a JVM problem the application 
slowly corrupts. Right?

Wrong! The Executor implementation has not been specified yet. Since 
we've decided its in charge of thread and execution management, this 
also seems like a good place to deal with thread monitoring...

public interface ThreadMonitor
{
   void monitor( Runnable runnable, Thread runner );

   ThreadMonitorEntry[] getMonitoredThreads();
}
public class ThreadMonitorEntry
{
   public final Runnable runnable;
   public final Thread runner;

   public ThreadMonitorEntry( Runnable runnable, Thread runner )
   {
     this.runnable = runnable;
     this.runner = runner;
   }
}
/** @avalon.component type="Executor" */
public class MonitoringThreadedExecutor implements Executor,
     Servicable
{
   ThreadMonitor m_monitor; // set through service()

   /** @avalon.dependency type="ThreadMonitor" */
   public void service( ServiceManager sm )
     throws ServiceException
   {
     m_monitor = sm.lookup( ThreadMonitor.ROLE );
   }

   public void execute( Runnable runnable )
   {
     Thread runner = new Thread( runnable );
     runnable.start();

     m_monitor.monitor( runnable, runner );
   }
}
/** @avalon.component type="ThreadMonitor" */
public class ContainerThreadMonitor implements ThreadMonitor
{
   private Container m_container; // callback gotten from Context

   private m_entries = new ArrayList();

   public void add( Runnable runnable, Thread runner )
   {
     Entry e = new Entry( runnable, runner );
     m_entries.add( e );
   }

   public ThreadMonitorEntry[] getMonitoredThreads()
   {
     return (ThreadMonitorEntry[])m_entries.toArray(
       new ThreadMonitorEntry[0] );
   }
}

...this clearly allows you to determine the state of your system: look 
up the ContainerThreadMonitor implementation via your management 
mechanism and you'll be able to inspect and modify all the threads. For 
example, you might do:

restart( containerHolder )       // BeanShell code sketch
{
   container = containerHolder.get( Container.ROLE );
   monitor = container.get( ThreadMonitor.ROLE );
   entries = monitor.getMonitoredThreads();
   for( int i = 0; i < entries; length; i++ )
   {
     entries[i].runner.interrupt();
   }

   Thread.sleep(10000);

   container.stop();
   container.dispose();
   container = new Container();
   container.start();

   containerHolder.set( Container.ROLE, container );
}


Advanced Thread Management
--------------------------
but what about just restarting the one component? The above 
implementation might be quite 'eavy, no? Well, to restart a single 
component, we'd need the monitor to be aware of what component a 
particular runnable belongs to. In code:


public class ThreadMonitorEntry
{
   public final Runnable runnable;
   public final Thread runner;
   public final Object component;

   public ThreadMonitorEntry( Runnable runnable, Thread runner,
       Object component )
   {
     this.runnable = runnable;
     this.runner = runner;
     this.component = component;
   }
}

restartDeadThreads( container )       // BeanShell code sketch
{
   monitor = container.get( ThreadMonitor.ROLE );
   entries = monitor.getMonitoredThreads();
   for( int i = 0; i < entries; length; i++ )
   {
     if(!entries[i].runner.isAlive()) // dumb check to see if something
                                      // is wrong, this should be
                                      // something intelligent
         container.restart( entries[i] );
   }
}


but how do we get that component reference in the right location? 
There's a whole range of options. For example, the container could 
associate an instance with the thread it is created in (ie, subclass 
Thread). But that's just ugly. "Subclassing considered evil until all 
alternatives have been exhausted", if you wish.

We could also modify the executor interface, requiring components to 
provide a reference to themselves:

interface Executor
{
   /** usage: execute( getWorker(), this ); */
   void execute( Runnable runnable, Object caller );
}

Or, we could opt to create an executor for each and every component that 
requires one, and delegate creation of executors to the container:

/** @avalon.component type="Executor" */
public class MonitoringThreadedExecutor implements Executor,
     Servicable
{
   ThreadMonitor m_monitor; // set through service()
   Object m_component;

   /** @avalon.dependency type="ThreadMonitor" */
   public void service( ServiceManager sm )
     throws ServiceException
   {
     m_monitor = sm.lookup( ThreadMonitor.ROLE );
   }

   public void setComponent( Object component )
   {
     m_component = component;
   }

   public void execute( Runnable runnable )
   {
     Thread runner = new Thread( runnable );
     runnable.start();

     m_monitor.monitor( runnable, runner, m_component );
   }
}

A final option would be to use AOP or some smart proxying to rewrite the 
call to execute() to include the caller as an argument, but that's a bit 
too much magic to delve into here. Besides, I've done enough thinking 
for tonight :D

Anyway, the second version of MonitoringThreadedExecutor does not 
require modification of the Executor interface (which is a good thing, 
since the client component does not need to know a reference to it is 
being kept), still allows components to run in various environments 
(you'll just have to provide an Executor yourself, instead of having the 
container do so, in case the container doesn't support them), and allows 
  the monitoring and threading code to be reused as well.

==========
Conclusion
==========
We started of with a design issue: making thread management for 
components simpler. We did some preliminary analysis and came up with a 
very ugly solution. Through the application of COP, IoC and SoC, and 
some heavy refactoring, we've arrived at a much better solution to our 
problem. Along the way, we found out that we don't want thread 
management for our components, but rather execution management.

Our new solution does not require modification of Avalon-Framework, fits 
naturally with the average needs of Startable components, results in a 
codebase which is largely reusable, allows advanced monitoring, error 
recovery, load balancing, and various other things to happen 
transparently inside advanced containers.

Clearly, COP/IoC/SoC are a winning combination when it comes to 
facilitating reusable and keeping compact but evolvable frameworks!

Putting this to use?
--------------------
So, should we now add Executor support to Merlin, along with thread 
monitoring coupled with dynamic component reloading on thread 
disruption? If so, where do we hook it in, and how?

IMO, this stuff should be an optional extension only and it should not 
disrupt existing code or make it too complex. In order to do that, some 
merlin refactoring might be required :D

Credits
-------
The Executor interface and pattern is from Doug Lea's "Concurrent 
programming in Java". The pooling and socket server code draws from 
concepts introduced in avalon-cornerstone, which were written by various 
authors.

About the author
----------------
Leo Simons is a physics student and likes cracking tough programming 
problems in his free time. He has been hanging around the avalon project 
for years now.

cheers!

- LSD



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@avalon.apache.org
For additional commands, e-mail: dev-help@avalon.apache.org


Mime
View raw message