incubator-wadi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bdud...@apache.org
Subject svn commit: r356933 [8/35] - in /incubator/wadi/trunk: ./ etc/ modules/ modules/assembly/ modules/assembly/src/ modules/assembly/src/bin/ modules/assembly/src/conf/ modules/assembly/src/main/ modules/assembly/src/main/assembly/ modules/core/ modules/co...
Date Wed, 14 Dec 2005 23:36:16 GMT
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/AbstractDispatcher.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/AbstractDispatcher.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/AbstractDispatcher.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/AbstractDispatcher.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,587 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.impl;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.gridstate.Dispatcher;
+import org.codehaus.wadi.gridstate.DispatcherConfig;
+import org.codehaus.wadi.impl.Quipu;
+
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
+import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
+import EDU.oswego.cs.dl.util.concurrent.TimeoutException;
+
+//TODO - has grown and grown - could do with pruning/refactoring...
+
+/**
+ * The portable aspects of a Dispatcher implementation
+ *
+ * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell</a>
+ * @version $Revision: 1.16 $
+ */
+public abstract class AbstractDispatcher implements Dispatcher, MessageListener {
+	
+	protected final String _nodeName;
+	protected final String _clusterName;
+	protected final long _inactiveTime;
+	protected final Map _map;
+	protected final PooledExecutor _executor;
+	protected final Log _messageLog = LogFactory.getLog("org.codehaus.wadi.MESSAGES");
+	
+	public AbstractDispatcher(String nodeName, String clusterName, long inactiveTime) {
+		_nodeName=nodeName;
+		_clusterName=clusterName;
+		_inactiveTime=inactiveTime;
+		_map=new HashMap();
+		_executor=new PooledExecutor(); // parameterise
+		//_executor.setMinimumPoolSize(200);
+		//_executor.runWhenBlocked();
+		_executor.setThreadFactory(new ThreadFactory() {
+			protected int _count;
+			
+			public synchronized Thread newThread(Runnable runnable) {
+				//_log.info("CREATING THREAD: "+_count);
+				return new Thread(runnable, "WADI Dispatcher ("+(_count++)+")");
+			}
+		});
+	}
+	
+	protected Log _log = LogFactory.getLog(getClass());
+	protected DispatcherConfig _config;
+	protected final Map _rvMap = new ConcurrentHashMap();
+	
+	public void init(DispatcherConfig config) throws Exception {
+		_config=config;
+	}
+	
+	class TargetDispatcher implements InternalDispatcher {
+		protected final Object _target;
+		protected final Method _method;
+		protected final ThreadLocal _pair=new ThreadLocal(){protected Object initialValue() {return new Object[2];}};
+		
+		public TargetDispatcher(Object target, Method method) {
+			_target=target;
+			_method=method;
+		}
+		
+		public void dispatch(ObjectMessage om, Serializable obj) throws InvocationTargetException, IllegalAccessException {
+			Object[] pair=(Object[])_pair.get();
+			pair[0]=om;
+			pair[1]=obj;
+			_method.invoke(_target, pair);
+		}
+		
+		public String toString() {
+			return "<TargetDispatcher: "+_method+" dispatched on: "+_target+">";
+		}
+		
+		protected int _count;
+		public void incCount() {_count++;}
+		public void decCount() {_count--;}
+		public synchronized int getCount() {return _count;}
+	}
+	
+	class NewTargetDispatcher implements InternalDispatcher {
+		protected final Object _target;
+		protected final Method _method;
+		protected final ThreadLocal _singleton=new ThreadLocal(){protected Object initialValue() {return new Object[1];}};
+		
+		public NewTargetDispatcher(Object target, Method method) {
+			_target=target;
+			_method=method;
+		}
+		
+		public void dispatch(ObjectMessage message, Serializable request) throws InvocationTargetException, IllegalAccessException {
+			Object[] singleton=(Object[])_singleton.get();
+			singleton[0]=request;
+			Object response=_method.invoke(_target, singleton);
+			reply(message, (Serializable)response);
+		}
+		
+		public String toString() {
+			return "<TargetDispatcher: "+_method+" dispatched on: "+_target+">";
+		}
+		
+		protected int _count;
+		public void incCount() {_count++;}
+		public void decCount() {_count--;}
+		public synchronized int getCount() {return _count;}
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#register(java.lang.Object, java.lang.String, java.lang.Class)
+	 */
+	public InternalDispatcher register(Object target, String methodName, Class type) {
+		try {
+			Method method=target.getClass().getMethod(methodName, new Class[] {ObjectMessage.class, type});
+			if (method==null) return null;
+			
+			InternalDispatcher nuw=new TargetDispatcher(target, method);
+			
+			InternalDispatcher old=(InternalDispatcher)_map.put(type, nuw);
+			if (old!=null)
+				if (_log.isWarnEnabled()) _log.warn("later registration replaces earlier - multiple dispatch NYI: "+old+" -> "+nuw);
+			
+			if (_log.isTraceEnabled()) _log.trace("registering: "+type.getName()+"."+methodName+"()");
+			return nuw;
+		} catch (NoSuchMethodException e) {
+			if (_log.isErrorEnabled()) _log.error("no method: " + methodName + "(" + type.getName() + ") on class: " + target.getClass().getName(), e);
+			return null;
+		}
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#newRegister(java.lang.Object, java.lang.String, java.lang.Class)
+	 */
+	public InternalDispatcher newRegister(Object target, String methodName, Class type) {
+		try {
+			Method method=target.getClass().getMethod(methodName, new Class[] {type});
+			if (method==null) return null;
+			
+			InternalDispatcher nuw=new NewTargetDispatcher(target, method);
+			
+			InternalDispatcher old=(InternalDispatcher)_map.put(type, nuw);
+			if (old!=null)
+				if (_log.isWarnEnabled()) _log.warn("later registration replaces earlier - multiple dispatch NYI: "+old+" -> "+nuw);
+			
+			if (_log.isTraceEnabled()) _log.trace("registering: "+type.getName()+"."+methodName+"()");
+			return nuw;
+		} catch (NoSuchMethodException e) {
+			if (_log.isErrorEnabled()) _log.error("no method: " + methodName + "(" + type.getName() + ") on class: " + target.getClass().getName(), e);
+			return null;
+		}
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#deregister(java.lang.String, java.lang.Class, int)
+	 */
+	public boolean deregister(String methodName, Class type, int timeout) {
+		TargetDispatcher td=(TargetDispatcher)_map.get(type);
+		if (td==null)
+			return false;
+		else
+			// this isn't failproof - if onMessage has not yet been called,
+			// the counter may still read 0 - but it's the best we can do...
+			for (int i=timeout; td._count>0 && i>0; i--) {
+				try {
+					Thread.sleep(1000);
+				} catch (InterruptedException e) {
+					// ignore - TODO
+				}
+			}
+		
+		_map.remove(type);
+		return td._count<=0;
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#newDeregister(java.lang.String, java.lang.Class, int)
+	 */
+	public boolean newDeregister(String methodName, Class type, int timeout) {
+		NewTargetDispatcher td=(NewTargetDispatcher)_map.get(type);
+		if (td==null)
+			return false;
+		else
+			// this isn't failproof - if onMessage has not yet been called,
+			// the counter may still read 0 - but it's the best we can do...
+			for (int i=timeout; td._count>0 && i>0; i--) {
+				try {
+					Thread.sleep(1000);
+				} catch (InterruptedException e) {
+					// ignore - TODO
+				}
+			}
+		
+		_map.remove(type);
+		return td._count<=0;
+	}
+	
+	class RendezVousDispatcher implements InternalDispatcher {
+		protected final Map _rvMap2;
+		protected final long _timeout;
+		
+		public RendezVousDispatcher(Map rvMap, long timeout) {
+			_rvMap2=rvMap;
+			_timeout=timeout;
+		}
+		
+		public void dispatch(ObjectMessage om, Serializable obj) throws Exception {
+			// rendez-vous with waiting thread...
+			String correlationId=getIncomingCorrelationId(om);
+			synchronized (_rvMap2) {
+				Quipu rv=(Quipu)_rvMap2.get(correlationId);
+				if (rv==null) {
+					if (_log.isWarnEnabled()) _log.warn("no-one waiting: {"+correlationId+"} - "+obj);
+				} else {
+					if (_log.isTraceEnabled()) _log.trace("rendez-vous-ing with Quipu: "+correlationId);
+					rv.putResult(om);
+				}
+			}
+		}
+		
+		public String toString() {
+			return "<RendezVousDispatcher>";
+		}
+		
+		protected int _count;
+		public void incCount() {_count++;}
+		public void decCount() {_count--;}
+		public synchronized int getCount() {return _count;}
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#register(java.lang.Class, long)
+	 */
+	public void register(Class type, long timeout) {
+		_map.put(type, new RendezVousDispatcher(_rvMap, timeout));
+		if (_log.isTraceEnabled()) _log.trace("registering class: "+type.getName());
+	}
+	
+	class DispatchRunner implements Runnable {
+		protected final InternalDispatcher _dispatcher;
+		protected final ObjectMessage _objectMessage;
+		protected final Serializable _serializable;
+		
+		public DispatchRunner(InternalDispatcher dispatcher, ObjectMessage objectMessage, Serializable serializable) {
+			_dispatcher=dispatcher;
+			_objectMessage=objectMessage;
+			_serializable=serializable;
+		}
+		
+		public void run() {
+			try {
+				_dispatcher.dispatch(_objectMessage, _serializable);
+				synchronized (_dispatcher) {
+					_dispatcher.decCount();
+				}
+			} catch (Exception e) {
+				_log.error("problem dispatching message", e);
+			}
+		}
+	}
+	
+	//-----------------------------------------------------------------------------------------------
+	
+	class SimpleCorrelationIDFactory {
+		
+		protected final SynchronizedInt _count=new SynchronizedInt(0);
+		
+		public String create() {
+			return Integer.toString(_count.increment());
+		}
+		
+	}
+	
+	protected final SimpleCorrelationIDFactory _factory=new SimpleCorrelationIDFactory();
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#getRendezVousMap()
+	 */
+	public Map getRendezVousMap() {
+		return _rvMap;
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#nextCorrelationId()
+	 */
+	public String nextCorrelationId() {
+		return _factory.create();
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#setRendezVous(java.lang.String, int)
+	 */
+	public Quipu setRendezVous(String correlationId, int numLlamas) {
+		Quipu rv=new Quipu(numLlamas);
+		_rvMap.put(correlationId, rv);
+		return rv;
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#attemptRendezVous(java.lang.String, org.codehaus.wadi.impl.Quipu, long)
+	 */
+	public ObjectMessage attemptRendezVous(String correlationId, Quipu rv, long timeout) {
+		// rendez-vous with response/timeout...
+		ObjectMessage response=null;
+		try {
+			do {
+				try {
+					long startTime=System.currentTimeMillis();
+					if (rv.waitFor(timeout)) {
+						response=(ObjectMessage)rv.getResults().toArray()[0]; // TODO - Aargh!
+						long elapsedTime=System.currentTimeMillis()-startTime;
+						if (_log.isTraceEnabled()) _log.trace("successful message exchange within timeframe ("+elapsedTime+"<"+timeout+" millis) {"+correlationId+"}"); // session does not exist
+					} else {
+						response=null;
+						if (_log.isWarnEnabled()) _log.warn("unsuccessful message exchange within timeframe ("+timeout+" millis) {"+correlationId+"}", new Exception());
+					}
+				} catch (TimeoutException e) {
+					if (_log.isWarnEnabled()) _log.warn("no response to request within timeout ("+timeout+" millis)"); // session does not exist
+				} catch (InterruptedException e) {
+					if (_log.isWarnEnabled()) _log.warn("waiting for response - interruption ignored");
+				}
+			} while (Thread.interrupted());
+		} finally {
+			// tidy up rendez-vous
+			if (correlationId!=null)
+				_rvMap.remove(correlationId);
+		}
+		return response;
+	}
+	
+	// TODO - rather than owning this, we should be given a pointer to it at init()
+	// time, and this accessor should be removed...
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#getExecutor()
+	 */
+	public PooledExecutor getExecutor() {
+		return _executor;
+	}
+	
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#exchangeSendLoop(javax.jms.Destination, javax.jms.Destination, java.io.Serializable, long, int)
+	 */
+	public ObjectMessage exchangeSendLoop(Destination from, Destination to, Serializable body, long timeout, int iterations) {
+		ObjectMessage response=null;
+		for (int i=0; response==null && i<iterations; i++) {
+			response=exchangeSend(from, to, body, timeout);
+			if (response==null)
+				if (_log.isWarnEnabled()) _log.warn("null response - retrying: " + ( i + 1 ) + "/" + iterations);
+		}
+		return response;
+	}
+	
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#exchangeSend(javax.jms.Destination, javax.jms.Destination, java.io.Serializable, long)
+	 */
+	public ObjectMessage exchangeSend(Destination from, Destination to, Serializable body, long timeout) {
+		return exchangeSend(from, to, body, timeout, null);
+	}
+	
+	
+	public void onMessage(Message message) {
+		try {
+			ObjectMessage objectMessage=null;
+			Serializable body=null;
+			InternalDispatcher dispatcher;
+			if (
+					message instanceof ObjectMessage &&
+					(objectMessage=(ObjectMessage)message)!=null &&
+					(body=objectMessage.getObject())!=null &&
+					(dispatcher=(InternalDispatcher)_map.get(body.getClass()))!=null
+			) {
+				if (_messageLog.isTraceEnabled()) _messageLog.trace("incoming: "+body+" {"+getNodeName(message.getJMSReplyTo())+"->"+getNodeName(message.getJMSDestination())+"} - "+getIncomingCorrelationId(objectMessage)+"/"+getOutgoingCorrelationId(objectMessage));
+				do {
+					try {
+						synchronized (dispatcher) {
+							_executor.execute(new DispatchRunner(dispatcher, objectMessage, body)); // TODO - pool DispatchRunner ?
+							dispatcher.incCount();
+						}
+					} catch (InterruptedException e) {
+						// ignore
+					}
+				} while (Thread.interrupted());
+			} else {
+				if (_log.isWarnEnabled()) _log.warn("spurious message received: " + message);
+			}
+		} catch (Exception e) {
+			_log.warn("bad message", e);
+		}
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#reply(javax.jms.ObjectMessage, java.io.Serializable)
+	 */
+	public boolean reply(ObjectMessage message, Serializable body) {
+		try {
+			ObjectMessage om=createObjectMessage();
+			Destination from=getLocalDestination();
+			om.setJMSReplyTo(from);
+			Destination to=message.getJMSReplyTo();
+			om.setJMSDestination(to);
+			String incomingCorrelationId=getOutgoingCorrelationId(message);
+			setIncomingCorrelationId(om, incomingCorrelationId);
+			om.setObject(body);
+			if (_log.isTraceEnabled()) _log.trace("reply: " + getNodeName(from) + " -> " + getNodeName(to) + " {" + incomingCorrelationId + "} : " + body);
+			send(to, om);
+			return true;
+		} catch (Exception e) {
+			_log.error("problem replying to message", e);
+			return false;
+		}
+	}
+	
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#send(javax.jms.Destination, javax.jms.Destination, java.lang.String, java.io.Serializable)
+	 */
+	public boolean send(Destination from, Destination to, String outgoingCorrelationId, Serializable body) {
+		try {
+			ObjectMessage om=createObjectMessage();
+			om.setJMSReplyTo(from);
+			om.setJMSDestination(to);
+			setOutgoingCorrelationId(om, outgoingCorrelationId);
+			om.setObject(body);
+			if (_log.isTraceEnabled()) _log.trace("send {" + outgoingCorrelationId + "}: " + getNodeName(from) + " -> " + getNodeName(to) + " : " + body);
+			send(to, om);
+			return true;
+		} catch (Exception e) {
+			if (_log.isErrorEnabled()) _log.error("problem sending " + body, e);
+			return false;
+		}
+	}
+	
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#exchangeSend(javax.jms.Destination, javax.jms.Destination, java.io.Serializable, long, java.lang.String)
+	 */
+	public ObjectMessage exchangeSend(Destination from, Destination to, Serializable body, long timeout, String targetCorrelationId) {
+		try {
+			ObjectMessage om=createObjectMessage();
+			om.setJMSReplyTo(from);
+			om.setJMSDestination(to);
+			om.setObject(body);
+			String correlationId=nextCorrelationId();
+			setOutgoingCorrelationId(om, correlationId);
+			if (targetCorrelationId!=null)
+				setIncomingCorrelationId(om, targetCorrelationId);
+			Quipu rv=setRendezVous(correlationId, 1);
+			if (_log.isTraceEnabled()) _log.trace("exchangeSend {" + correlationId + "}: " + getNodeName(from) + " -> " + getNodeName(to) + " : " + body);
+			send(to, om);
+			return attemptRendezVous(correlationId, rv, timeout);
+		} catch (Exception e) {
+			if (_log.isErrorEnabled()) _log.error("problem sending " + body, e);
+			return null;
+		}
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#exchangeSend(javax.jms.Destination, javax.jms.Destination, java.lang.String, java.io.Serializable, long)
+	 */
+	public ObjectMessage exchangeSend(Destination from, Destination to, String outgoingCorrelationId, Serializable body, long timeout) {
+		Quipu rv=null;
+		// set up a rendez-vous...
+		rv=setRendezVous(outgoingCorrelationId, 1);
+		// send the message...
+		if (send(from, to, outgoingCorrelationId, body)) {
+			return attemptRendezVous(outgoingCorrelationId, rv, timeout);
+		} else {
+			return null;
+		}
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#reply(javax.jms.Destination, javax.jms.Destination, java.lang.String, java.io.Serializable)
+	 */
+	public boolean reply(Destination from, Destination to, String incomingCorrelationId, Serializable body) {
+		try {
+			ObjectMessage om=createObjectMessage();
+			om.setJMSReplyTo(from);
+			om.setJMSDestination(to);
+			setIncomingCorrelationId(om, incomingCorrelationId);
+			om.setObject(body);
+			if (_log.isTraceEnabled()) _log.trace("reply: " + getNodeName(from) + " -> " + getNodeName(to) + " {" + incomingCorrelationId + "} : " + body);
+			send(to, om);
+			return true;
+		} catch (Exception e) {
+			if (_log.isErrorEnabled()) _log.error("problem sending " + body, e);
+			return false;
+		}
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#exchangeReply(javax.jms.ObjectMessage, java.io.Serializable, long)
+	 */
+	public ObjectMessage exchangeReply(ObjectMessage message, Serializable body, long timeout) {
+		try {
+			ObjectMessage om=createObjectMessage();
+			Destination from=getLocalDestination();
+			om.setJMSReplyTo(from);
+			Destination to=message.getJMSReplyTo();
+			om.setJMSDestination(to);
+			String incomingCorrelationId=getOutgoingCorrelationId(message);
+			setIncomingCorrelationId(om, incomingCorrelationId);
+			String outgoingCorrelationId=nextCorrelationId();
+			setOutgoingCorrelationId(om, outgoingCorrelationId);
+			om.setObject(body);
+			Quipu rv=setRendezVous(outgoingCorrelationId, 1);
+			if (_log.isTraceEnabled()) _log.trace("exchangeSend {" + outgoingCorrelationId + "}: " + getNodeName(from) + " -> " + getNodeName(to) + " {" + incomingCorrelationId + "} : " + body);
+			send(to, om);
+			return attemptRendezVous(outgoingCorrelationId, rv, timeout);
+		} catch (Exception e) {
+			if (_log.isErrorEnabled()) _log.error("problem sending " + body, e);
+			return null;
+		}
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#exchangeReplyLoop(javax.jms.ObjectMessage, java.io.Serializable, long)
+	 */
+	public ObjectMessage exchangeReplyLoop(ObjectMessage message, Serializable body, long timeout) { // TODO
+		return exchangeReply(message, body, timeout);
+	}
+	
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#forward(javax.jms.ObjectMessage, javax.jms.Destination)
+	 */
+	public boolean forward(ObjectMessage message, Destination destination) {
+		try {
+			return forward(message, destination, message.getObject());
+		} catch (JMSException e) {
+			_log.error("problem forwarding message with new body", e);
+			return false;
+		}
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.impl.Dispatcher#forward(javax.jms.ObjectMessage, javax.jms.Destination, java.io.Serializable)
+	 */
+	public boolean forward(ObjectMessage message, Destination destination, Serializable body) {
+		try {
+			return send(message.getJMSReplyTo(), destination, getOutgoingCorrelationId(message), body);
+		} catch (Exception e) {
+			_log.error("problem forwarding message", e);
+			return false;
+		}
+	}
+	
+	public String getNodeName() {
+		return _nodeName;
+	}
+	
+	public long getInactiveTime() {
+		return _inactiveTime;
+	}
+	
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/DummyPartitionManager.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/DummyPartitionManager.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/DummyPartitionManager.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/DummyPartitionManager.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,75 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.impl;
+
+import javax.jms.Destination;
+
+import org.codehaus.wadi.gridstate.Dispatcher;
+import org.codehaus.wadi.gridstate.PartitionManager;
+import org.codehaus.wadi.gridstate.PartitionManagerConfig;
+
+public class DummyPartitionManager implements PartitionManager {
+
+	protected final int _numPartitions;
+	
+	public DummyPartitionManager(int numPartitions) {
+		super();
+		_numPartitions=numPartitions;
+	}
+
+	public void init(PartitionManagerConfig config) {
+		// empty
+	}
+
+	public void start() {
+		// empty
+	}
+	
+	public void stop() {
+		// empty
+	}
+
+	public PartitionFacade[] getPartitions() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public int getNumPartitions() {
+		return _numPartitions;
+	}
+
+	public PartitionFacade getPartition(Object key) {
+		// TODO Auto-generated method stub
+		return null;
+	}
+	
+	// PartitionConfig
+	
+	public Dispatcher getDispatcher() {
+		return null;
+	}
+
+	public Destination getLocalDestination() {
+		return null;
+	}
+
+	public void evacuate() {
+		// TODO Auto-generated method stub
+		
+	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/GCache.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/GCache.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/GCache.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/GCache.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,379 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.impl;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import javax.cache.Cache;
+import javax.cache.CacheEntry;
+import javax.cache.CacheException;
+import javax.cache.CacheListener;
+import javax.cache.CacheStatistics;
+import javax.jms.Destination;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.gridstate.Dispatcher;
+import org.codehaus.wadi.gridstate.DispatcherConfig;
+import org.codehaus.wadi.gridstate.LockManager;
+import org.codehaus.wadi.gridstate.PartitionConfig;
+import org.codehaus.wadi.gridstate.PartitionManager;
+import org.codehaus.wadi.gridstate.PartitionManagerConfig;
+import org.codehaus.wadi.gridstate.StateManager;
+import org.codehaus.wadi.gridstate.StateManagerConfig;
+
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+
+/**
+ * Geronimo is going to need a standard API for lookup of sessions across the Cluster.
+ * JCache is the obvious choice.
+ * This will allow the plugging of either e.g. GCache (WADI), Tangosol's Coherence or IPMs solution without changing of Geronimo code.
+ * In fact, this will allow WADI to sit on top of any of these three.
+ *
+ * GCache is a JCache compatible interface onto DIndex - WADI's own distributed index, which fulfills
+ * WADI's requirements for this lookup...
+ *
+ * @author jules
+ *
+ */
+public class GCache implements Cache, DispatcherConfig, StateManagerConfig, PartitionManagerConfig {
+
+	protected final Log _log=LogFactory.getLog(getClass().getName());
+
+	protected final Dispatcher _dispatcher;
+	protected final PartitionManager _partitionManager;
+	protected final StateManager _stateManager;
+	protected final Map _map=new HashMap();
+	protected final LockManager _pmSyncs=new StupidLockManager("PM");
+	protected final LockManager _smSyncs=new StupidLockManager("IM/SM");
+
+	// interactional state - ThreadLocal
+
+	protected ThreadLocal _threadLocks=new ThreadLocal() {
+		public Object initialValue() {
+			return new HashMap();
+		}
+	};
+
+	// release the whole LockSet - the end of an 'interaction'
+	public void release() {
+		// Map is ThreadLocal - so no need to synchronise...
+		for (Iterator i=((Map)_threadLocks.get()).entrySet().iterator(); i.hasNext(); ) {
+			Entry entry=(Entry)i.next();
+			Object key=entry.getKey();
+			Sync sync=(Sync)entry.getValue();
+			sync.release();
+			i.remove();
+            if (_log.isInfoEnabled()) _log.info("released: " + key);
+		}
+	}
+
+	// add a lock to the LockSet/interaction
+	protected void addLock(Object key, Sync newSync) {
+		Map locks=(Map)_threadLocks.get();
+		Sync oldSync=(Sync)locks.get(key);
+
+        if (_log.isInfoEnabled()) _log.info("adding: " + key);
+		if (oldSync==null) {
+			locks.put(key, newSync);
+		} else {
+            if (_log.isWarnEnabled()) _log.warn("NYI...");
+		}
+	}
+
+	public GCache(Dispatcher dispatcher, PartitionManager partitionManager, StateManager stateManager) throws Exception {
+		_dispatcher=dispatcher;
+		_dispatcher.init(this);
+		_partitionManager=partitionManager;
+		_stateManager=stateManager;
+	}
+
+	/*
+	 * Does the local cache contain the given key ? Does no locking, so if it replies true
+	 *  and you go back to the cache for the value, it may have evaporated...Is this correct ?
+	 */
+	public boolean containsKey(Object key) {
+		// TODO - correct ?
+		synchronized (_map) {
+			return _map.containsKey(key);
+		}
+	}
+
+	/*
+	 * third pass
+	 */
+	public boolean containsValue(Object value) {
+		throw new UnsupportedOperationException();
+	}
+
+	/*
+	 * third pass
+	 */
+	public Set entrySet() {
+		throw new UnsupportedOperationException();
+	}
+
+	/*
+	 * second pass
+	 */
+	public boolean isEmpty() {
+		throw new UnsupportedOperationException();
+	}
+
+	/*
+	 * second pass
+	 */
+	public Set keySet() {
+		throw new UnsupportedOperationException();
+	}
+
+	/*
+	 * first pass
+	 */
+	public void putAll(Map t) {
+		// TODO Auto-generated method stub
+	}
+
+	/*
+	 * first pass
+	 */
+	public int size() {
+		return getCacheStatistics().getObjectCount();
+	}
+
+	/*
+	 * third pass
+	 */
+	public Collection values() {
+		throw new UnsupportedOperationException();
+	}
+
+	/*
+	 * Find, globally, the value associated with this key and return it.
+	 */
+	public Object get(Object key) {
+		return _stateManager.get(key);
+	}
+
+	/*
+	 * first/second pass
+	 */
+	public Map getAll(Collection keys) throws CacheException {
+		throw new UnsupportedOperationException();
+	}
+
+	/*
+	 * second pass ?
+	 */
+	public void load(Object key) throws CacheException {
+		throw new UnsupportedOperationException();
+	}
+
+	/*
+	 * second pass ?
+	 */
+	public void loadAll(Collection keys) throws CacheException {
+		throw new UnsupportedOperationException();
+	}
+
+	/*
+	 * Find, locally, the value associated with this key and return it (no loaders called)
+	 */
+	public Object peek(Object key) {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	/*
+	 * Put, globally, this key:value association, returning any value previously globally associated with the same key.
+	 */
+	public Object put(Object key, Object value) {
+		return put(key, value, true, true);
+	}
+
+
+	/**
+	 * Extension: Insert, globally, the key:value association for the first time. This can be more efficient than a put(),
+	 * as we can be sure that it will not have to return a value that is held remotely. Returns success if the insertion was
+	 * able to occur (i.e. there was no value previously associated with this key).
+	 *
+	 * @param key
+	 * @param value
+	 * @return
+	 */
+	public boolean putFirst(Object key, Object value) {
+		return ((Boolean)put(key, value, false, true)).booleanValue();
+	}
+
+
+	/**
+	 * Extension: Insert, globally, the key:value association only overwriting a previous value and returning it, if
+	 * the relevant flags are passed in. If overwrite is true, returnOldValue will return any value previously associated
+	 * with this key, else the insertion will occur and return Boolean.TRUE only if NO previous association exists otherwise
+	 * Boolean.FALSE will be returned.
+	 *
+	 * @param key
+	 * @param value
+	 * @param overwrite
+	 * @param returnOldValue
+	 * @return
+	 */
+	public Object put(Object key, Object value, boolean overwrite, boolean returnOldValue) {
+		return _stateManager.put(key, value, overwrite, returnOldValue);
+	}
+
+	/*
+	 * first pass ?
+	 * interesting - perhaps this is how we make location accessible
+	 */
+	public CacheEntry getCacheEntry(Object key) {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	/*
+	 * not sure ?
+	 */
+	public CacheStatistics getCacheStatistics() {
+		// TODO Auto-generated method stub
+		return null;
+
+		// needs to return :
+		// objectCount
+		// hits
+		// misses
+
+		// we will do best effort on all of these
+		// they can be included in each node's distributed state and aggregated on demand
+	}
+
+	/*
+	 * Remove, globally, any current association with this key, returning the correspondong value.
+	 */
+	public Object remove(Object key) {
+		return _stateManager.remove(key, true);
+	}
+
+	/**
+	 * Remove the key's association globally, returning its current value if the flag is true.
+	 * If the value is held remotely and not required, it will save bandwidth to pass in a value of false.
+	 *
+	 * @param key
+	 * @param returnOldValue
+	 * @return
+	 */
+	public Object remove(Object key, boolean returnOldValue) {
+		return _stateManager.remove(key, returnOldValue);
+	}
+
+	/*
+	 * third pass
+	 */
+	public void clear() {
+		throw new UnsupportedOperationException();
+	}
+
+	/*
+	 * first pass ?
+	 */
+	public void evict() {
+		// TODO Auto-generated method stub
+	}
+
+	/*
+	 * second/third pass
+	 */
+	public void addListener(CacheListener listener) {
+		throw new UnsupportedOperationException();
+	}
+
+	/*
+	 * second/third pass
+	 */
+	public void removeListener(CacheListener listener) {
+		throw new UnsupportedOperationException();
+	}
+
+	// Proprietary
+
+	public PartitionFacade[] getPartitions() {
+		return _partitionManager.getPartitions();
+	}
+
+	// for testing...
+	public Map getMap() {
+		return _map;
+	}
+
+	public LockManager getPMSyncs() {
+		return _pmSyncs;
+	}
+
+	public LockManager getSMSyncs() {
+		return _smSyncs;
+	}
+
+	public PartitionConfig getPartitionConfig() {
+		return (PartitionConfig)_partitionManager;
+	}
+
+	public PartitionManager getPartitionManager() {
+		return _partitionManager;
+	}
+
+	public StateManager getStateManager() {
+		return _stateManager;
+	}
+
+	public void start() throws Exception {
+    	_partitionManager.start();
+    	_stateManager.start();
+    }
+
+    public void stop() throws Exception {
+    	_stateManager.stop();
+    	_partitionManager.stop();
+    }
+
+	public PartitionFacade getPartition(Object key) {
+		return _partitionManager.getPartition(key);
+	}
+
+	// PartitionManagerConfig
+
+	public Destination getLocalDestination() {
+		return _dispatcher.getLocalDestination();
+	}
+
+	// LifeCycle
+
+	public void init() throws Exception {
+		_partitionManager.init(this);
+		_stateManager.init(this);
+	}
+
+	// DispatcherConfig API
+
+	public String getContextPath() {
+		return "/";
+	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/GCacheEntry.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/GCacheEntry.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/GCacheEntry.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/GCacheEntry.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,107 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.impl;
+
+import javax.cache.CacheEntry;
+
+import org.codehaus.wadi.impl.Utils;
+
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.ReaderPreferenceReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+
+public class GCacheEntry implements CacheEntry {
+
+	// GCacheEntry
+
+	protected ReadWriteLock _lock=new ReaderPreferenceReadWriteLock();
+
+	// the read lock is used by any thread wanting to keep this entry in this JVM
+	public void acquireReadLock() {
+		Utils.safeAcquire(_lock.readLock());
+	}
+
+	public void releaseReadLock() {
+		_lock.readLock().release();
+	}
+
+	// the write lock is used by any thread wishing to remove this entry from this jvm
+	public Sync getWriteLock() {
+		return _lock.writeLock();
+	}
+
+	protected Object _value;
+
+	public GCacheEntry(Object value) {
+		_value=value;
+	}
+
+	// CacheEntry
+
+	public int getHits() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public long getLastAccessTime() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public long getLastUpdateTime() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public long getCreationTime() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public long getExpirationTime() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public long getVersion() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public boolean isValid() {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	public long getCost() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public Object getKey() {
+		throw new UnsupportedOperationException("NYI");
+	}
+
+	public Object getValue() {
+		return _value;
+	}
+
+	public Object setValue(Object value) {
+		throw new UnsupportedOperationException("NYI");
+	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/GridStateContextualiser.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/GridStateContextualiser.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/GridStateContextualiser.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/GridStateContextualiser.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,53 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.impl;
+
+import org.codehaus.wadi.Contextualiser;
+import org.codehaus.wadi.Emoter;
+import org.codehaus.wadi.Immoter;
+import org.codehaus.wadi.Locker;
+import org.codehaus.wadi.Motable;
+import org.codehaus.wadi.impl.AbstractSharedContextualiser;
+
+public class GridStateContextualiser extends AbstractSharedContextualiser {
+
+	public GridStateContextualiser(Contextualiser next, Locker locker) {
+		super(next, locker, false);
+		// TODO Auto-generated constructor stub
+	}
+
+	public Emoter getEmoter() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Immoter getImmoter() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Motable get(String id) {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public void load(Emoter emoter, Immoter immoter) {
+		// TODO Auto-generated method stub
+
+	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/HybridLockManager.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/HybridLockManager.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/HybridLockManager.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/HybridLockManager.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,81 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.impl;
+
+import org.codehaus.wadi.impl.Utils;
+
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+
+/**
+ * A LockManager which allows you to store the lock that corresponds to an Object in that Object.
+ * An Adaptor is used to extract the lock from the Object.
+ * If the Object is null, the LockManager can either allocate its own lock (which will hang around for as long as it is used),
+ * or just return null.
+ *
+ * @author jules
+ *
+ */
+public class HybridLockManager extends SmartLockManager {
+
+	interface Adaptor {
+		Sync adapt(Object value);
+		boolean isValid(Object value);
+	}
+
+	protected final Adaptor _adaptor;
+	protected final boolean _always;
+
+	// There are two ways we can lock :
+	// 1. keep locks outside objects locked - even if we have not yet got the object in our hand we can lock it
+	// 2. keep lock in object - retrieve object, lock it and then check it is still valid (may have been removed between finding object and acquiring lock)
+	// This class allows us to use (2) in the case where the object sometimes does not exist at lock time...
+
+	public HybridLockManager(String name, Adaptor adaptor, boolean always) { // TODO - NEEDS TESTING !!
+		super(name);
+		_adaptor=adaptor;
+		_always=always;
+	}
+
+	public Sync acquire(Object key) {
+		return acquire(key, null);
+	}
+
+	public Sync acquire(Object key, Object value) {
+		Sync sync;
+		if (value==null) {
+			if (_always) {
+				return super.acquire(key);
+			} else {
+				return null;
+			}
+		} else {
+			sync=_adaptor.adapt(value);
+			if (sync==null) {
+				return null;
+			} else {
+				Utils.safeAcquire(sync);
+				if (_adaptor.isValid(value)) {
+					return sync;
+				} else {
+					sync.release();
+					return null;
+				}
+			}
+		}
+	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/IndirectStateManager.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/IndirectStateManager.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/IndirectStateManager.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/IndirectStateManager.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,423 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.impl;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.dindex.newmessages.MoveIMToSM;
+import org.codehaus.wadi.dindex.newmessages.MovePMToSM;
+import org.codehaus.wadi.dindex.newmessages.MoveSMToIM;
+import org.codehaus.wadi.dindex.newmessages.MoveSMToPM;
+import org.codehaus.wadi.dindex.newmessages.MovePMToIM;
+import org.codehaus.wadi.gridstate.Dispatcher;
+import org.codehaus.wadi.gridstate.StateManager;
+import org.codehaus.wadi.gridstate.StateManagerConfig;
+import org.codehaus.wadi.gridstate.messages.ReadIMToPM;
+import org.codehaus.wadi.gridstate.messages.WriteIMToPM;
+import org.codehaus.wadi.gridstate.messages.WritePMToIM;
+
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+
+// TODO - needs tidying up..
+
+public class IndirectStateManager implements StateManager {
+
+	protected final Log _log = LogFactory.getLog(getClass());
+
+	protected final String _clusterName = "ORG.CODEHAUS.WADI.TEST";
+	protected final long _timeout;
+	protected final Dispatcher _dispatcher; // should be final
+	protected final String _nodeName;
+
+	protected StateManagerConfig _config;
+
+
+	public IndirectStateManager(Dispatcher dispatcher, long timeout) throws Exception {
+    	_dispatcher=dispatcher;
+    	_timeout=timeout;
+    	_nodeName=_dispatcher.getNodeName();
+
+		// Get - 5 messages - IM->PM->SM->IM->SM->PM
+		_dispatcher.register(this, "onMessage", ReadIMToPM.class);
+		_dispatcher.register(this, "onMessage", MovePMToSM.class);
+		_dispatcher.register(MoveSMToIM.class, _timeout);
+		_dispatcher.register(MoveIMToSM.class, _timeout);
+		_dispatcher.register(MoveSMToPM.class, _timeout);
+		// Get - 2 messages - IM->PM->IM (NYI)
+		_dispatcher.register(MovePMToIM.class, _timeout);
+
+		// Put - 2 messages - IM->PM->IM
+		_dispatcher.register(this, "onMessage", WriteIMToPM.class);
+		_dispatcher.register(WritePMToIM.class, _timeout);
+	}
+
+	public void init(StateManagerConfig config) {
+		_config=config;
+	}
+
+	public void start() throws Exception {
+		Map state=new HashMap();
+		state.put("nodeName", _nodeName);
+		_dispatcher.setDistributedState(state);
+		_dispatcher.start();
+	}
+
+	public void stop() throws Exception {
+		_dispatcher.stop();
+	}
+
+	//--------------------------------------------------------------------------------
+	// Get
+	//--------------------------------------------------------------------------------
+
+	// called on IM...
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.sandbox.gridstate.StateManager#get(java.io.Object)
+	 */
+	public Object get(Object key) {
+		Sync sync=null;
+		String agent=_nodeName;
+		try {
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(IM)] - " + key + " - acquiring sync(" + sync + ")..." + " <" + Thread.currentThread().getName() + ">");
+			sync=_config.getSMSyncs().acquire(key); // TODO - an SMSync should actually be a lock in the state itself - read or write ?
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(IM)] - " + key + " - ...sync(" + sync + ") acquired" + " <" + Thread.currentThread().getName() + ">");
+			Object value=null;
+			Map map=_config.getMap();
+			synchronized (map) {
+				value=map.get(key);
+			}
+			if (value!=null)
+				return value;
+			else {
+				// exchangeSendLoop GetIMToPM to PM
+				Destination im=_dispatcher.getLocalDestination();
+				Destination pm=_config.getPartition(key).getDestination();
+				ReadIMToPM request=new ReadIMToPM(key, im);
+				ObjectMessage message=_dispatcher.exchangeSendLoop(im, pm, request, _timeout, 10);
+				Object response=null;
+				try {
+					response=message.getObject();
+				} catch (JMSException e) {
+				  _log.error("unexpected problem", e); // should be in loop - TODO
+				}
+
+				if (response instanceof MovePMToIM) {
+					// association not present
+					value=null;
+				} else if (response instanceof MoveSMToIM) {
+					// association exists
+					// associate returned value with key
+					value=((MoveSMToIM)response).getValue();
+					//_log.info("received "+key+"="+value+" <- SM");
+					synchronized (_config.getMap()) {
+						map.put(key, value);
+					}
+					// reply GetIMToSM to SM
+					_dispatcher.reply(message, new MoveIMToSM());
+				}
+
+				return value;
+			}
+		} finally {
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(IM)] - " + key + " - releasing sync(" + sync + ")..." + " <" + Thread.currentThread().getName() + ">");
+			sync.release();
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(IM)] - " + key + " - ...sync(" + sync + ") released" + " <" + Thread.currentThread().getName() + ">");
+		}
+	}
+
+	// called on PM...
+	public void onMessage(ObjectMessage message1, ReadIMToPM get) {
+		// what if we are NOT the PM anymore ?
+		// get write lock on location
+		Object key=get.getKey();
+		Sync sync=null;
+		String agent=_dispatcher.getNodeName((Destination)get.getIM());
+		try {
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(PM)] - " + key + " - acquiring sync(" + sync + ")..." + " <" + Thread.currentThread().getName() + ">");
+			sync=_config.getPMSyncs().acquire(key); // TODO - PMSyncs are actually WLocks on a given sessions location (partition entry) - itegrate
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(PM)] - " + key + " - ...sync(" + sync + ") acquired" + " <" + Thread.currentThread().getName() + ">");
+			PartitionFacade partition=_config.getPartition(key);
+			Location location=(Location)partition.getLocation(key);
+			if (location==null) {
+				_dispatcher.reply(message1,new MovePMToIM());
+				return;
+			}
+			// exchangeSendLoop GetPMToSM to SM
+			Destination im=(Destination)get.getIM();
+			Destination pm=_dispatcher.getLocalDestination();
+			Destination sm=(Destination)location.getValue();
+			String poCorrelationId=null;
+			try {
+				poCorrelationId=_dispatcher.getOutgoingCorrelationId(message1);
+				//_log.info("Process Owner Correlation ID: "+poCorrelationId);
+			} catch (Exception e) {
+			  _log.error("unexpected problem", e);
+			}
+			MovePMToSM request=new MovePMToSM(key, im, pm, poCorrelationId);
+			ObjectMessage message2=_dispatcher.exchangeSendLoop(pm, sm, request, _timeout, 10);
+			if (message2==null)
+			  _log.error("NO RESPONSE WITHIN TIMEFRAME - PANIC!");
+
+//			MoveSMToPM response=null;
+//			try {
+//				response=(MoveSMToPM)message2.getObject();
+//			} catch (JMSException e) {
+//			  _log.error("unexpected problem", e); // should be sorted in loop
+//			}
+			// alter location
+			location.setValue((Destination)get.getIM());
+
+		} finally {
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(PM)] - " + key + " - releasing sync(" + sync + ")..." + " <" + Thread.currentThread().getName() + ">");
+			sync.release();
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(PM)] - " + key + " - ...sync(" + sync + ") released" + " <" + Thread.currentThread().getName() + ">");
+		}
+	}
+
+	// called on SM...
+	public void onMessage(ObjectMessage message1, MovePMToSM get) {
+		Object key=get.getKey();
+		String agent=_dispatcher.getNodeName((Destination)get.getIM());
+		Sync sync=null;
+		try {
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(SM)] - " + key + " - acquiring sync(" + sync + ")..." + " <" + Thread.currentThread().getName() + ">");
+			sync=_config.getSMSyncs().acquire(key);
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(SM)] - " + key + " - ...sync(" + sync + ") acquired" + " <" + Thread.currentThread().getName() + ">");
+			// send GetSMToIM to IM
+			Destination sm=_dispatcher.getLocalDestination();
+			Destination im=(Destination)get.getIM();
+			Object value;
+			Map map=_config.getMap();
+			synchronized (map) {
+				value=map.get(key);
+			}
+			//_log.info("sending "+key+"="+value+" -> IM");
+			MoveSMToIM request=new MoveSMToIM(key, value);
+			ObjectMessage message2=(ObjectMessage)_dispatcher.exchangeSend(sm, im, request, _timeout, get.getIMCorrelationId());
+			// wait
+			// receive GetIMToSM
+
+			if (message2==null) {
+			  _log.error("NO REPLY RECEIVED FOR MESSAGE IN TIMEFRAME - PANIC!");
+			} else {
+			}
+//			MoveIMToSM response=null;
+//			try {
+//				response=(MoveIMToSM)message2.getObject();
+				// remove association
+				synchronized (map) {
+					map.remove(key);
+				}
+				// send GetSMToPM to PM
+				//Destination pm=(Destination)get.getPM();
+				_dispatcher.reply(message1,new MoveSMToPM());
+//			} catch (JMSException e) {
+//			  _log.error("unexpected problem", e);
+//			}
+		} finally {
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(SM)] - " + key + " - releasing sync(" + sync + ")..." + " <" + Thread.currentThread().getName() + ">");
+			sync.release();
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(SM)] - " + key + " - ...sync(" + sync + ") released" + " <" + Thread.currentThread().getName() + ">");
+		}
+	}
+
+
+	//--------------------------------------------------------------------------------
+	// Put
+	//--------------------------------------------------------------------------------
+
+	// called on IM...
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.sandbox.gridstate.StateManager#put(java.io.Object, java.io.Object, boolean, boolean)
+	 */
+	public Object put(Object key, Object value, boolean overwrite, boolean returnOldValue) {
+		boolean removal=(value==null);
+		Map map=_config.getMap();
+		Sync sync=null;
+		String agent=_nodeName;
+		try {
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(IM)] - " + key + " - acquiring sync(" + sync + ")..." + " <" + Thread.currentThread().getName() + ">");
+			sync=_config.getSMSyncs().acquire(key);
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(IM)] - " + key + " - ...sync(" + sync + ") acquired" + " <" + Thread.currentThread().getName() + ">");
+
+			if (!removal) { // removals must do the round trip to PM
+				boolean local;
+				synchronized (map) {
+					local=map.containsKey(key);
+				}
+
+				if (local) {
+					// local
+					if (overwrite) {
+						synchronized (map) {
+							Object oldValue=map.put(key, value);
+							return returnOldValue?oldValue:null;
+						}
+					} else {
+						return Boolean.FALSE;
+					}
+				}
+			}
+
+			// absent or remote
+			// exchangeSendLoop PutIMToPM to PM
+			Destination im=_dispatcher.getLocalDestination();
+			Destination pm=_config.getPartition(key).getDestination();
+			WriteIMToPM request=new WriteIMToPM(key, value==null, overwrite, returnOldValue, im);
+			ObjectMessage message=_dispatcher.exchangeSendLoop(im, pm, request, _timeout, 10);
+			Object response=null;
+			try {
+				response=message.getObject();
+			} catch (JMSException e) {
+			  _log.error("unexpected problem", e); // should be in loop - TODO
+			}
+
+			// 2 possibilities -
+			// PutPM2IM - Absent
+			if (response instanceof WritePMToIM) {
+				if (overwrite) {
+					synchronized (map) {
+						Object oldValue=(removal?map.remove(key):map.put(key, value));
+						return returnOldValue?oldValue:null;
+					}
+				} else {
+					if (((WritePMToIM)response).getSuccess()) {
+						synchronized (map) {
+							map.put(key, value);
+						}
+						return Boolean.TRUE;
+					} else {
+						return Boolean.FALSE;
+					}
+				}
+			} else if (response instanceof MoveSMToIM) {
+				// Present - remote
+				// reply GetIMToSM to SM
+				_dispatcher.reply(message, new MoveIMToSM());
+				synchronized (map) {
+					if (removal)
+						map.remove(key);
+					else
+						map.put(key, value);
+				}
+				return ((MoveSMToIM)response).getValue();
+			} else {
+                if (_log.isErrorEnabled()) _log.error("unexpected response: " + response.getClass().getName());
+				return null;
+			}
+
+		} finally {
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(IM)] - " + key + " - releasing sync(" + sync + ")..." + " <" + Thread.currentThread().getName() + ">");
+			sync.release();
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(IM)] - " + key + " - ...sync(" + sync + ") released" + " <" + Thread.currentThread().getName() + ">");
+		}
+	}
+
+	// called on PM...
+	public void onMessage(ObjectMessage message1, WriteIMToPM write) {
+		// what if we are NOT the PM anymore ?
+		Object key=write.getKey();
+		PartitionFacade partition=_config.getPartition(key);
+		Map partitionMap=partition.getMap();
+		Sync sync=null;
+		String agent=_dispatcher.getNodeName((Destination)write.getIM());
+		try {
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(PM)] - " + key + " - acquiring sync(" + sync + ")..." + " <" + Thread.currentThread().getName() + ">");
+			sync=_config.getPMSyncs().acquire(key);
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(PM)] - " + key + " - ...sync(" + sync + ") acquired" + " <" + Thread.currentThread().getName() + ">");
+			Location location=write.getValueIsNull()?null:new Location(write.getIM());
+			// remove or update location, remembering old value
+			Location oldLocation=(Location)(location==null?partitionMap.remove(key):partitionMap.put(key, location));
+			// if we are not allowed to overwrite, and we have...
+			if (!write.getOverwrite() && oldLocation!=null) {
+				//  undo our change
+				partitionMap.put(key, oldLocation);
+				// send PMToIM - failure
+				_dispatcher.reply(message1, new WritePMToIM(false));
+			} else if (oldLocation==null || (write.getIM().equals(oldLocation.getValue()))) {
+				// if there was previously no SM, or there was, but it was IM ...
+				// then there is no need to go and remove the old value from the old SM
+				// send PMToIM - success
+				_dispatcher.reply(message1, new WritePMToIM(true));
+			} else {
+				// previous value needs removing and possibly returning...
+				// send PMToSM...
+
+				String poCorrelationId=null;
+				try {
+					poCorrelationId=_dispatcher.getOutgoingCorrelationId(message1);
+					//_log.info("Process Owner Correlation ID: "+poCorrelationId);
+				} catch (Exception e) {
+				  _log.error("unexpected problem", e);
+				}
+				Destination im=(Destination)write.getIM();
+				Destination pm=_dispatcher.getLocalDestination();
+				Destination sm=(Destination)oldLocation.getValue();
+				MovePMToSM request=new MovePMToSM(key, im, pm, poCorrelationId);
+				/*ObjectMessage message2=*/_dispatcher.exchangeSendLoop(pm, sm, request, _timeout, 10);
+//				MoveSMToPM response=null;
+//				try {
+//				response=(MoveSMToPM)message2.getObject();
+//				} catch (JMSException e) {
+//				_log.error("unexpected problem", e); // should be sorted in loop
+//				}
+			}
+
+		} finally {
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(PM)] - " + key + " - releasing sync(" + sync + ")..." + " <" + Thread.currentThread().getName() + ">");
+			sync.release();
+            if (_log.isTraceEnabled()) _log.trace("[" + agent + "@" + _nodeName + "(PM)] - " + key + " - ...sync(" + sync + ") released" + " <" + Thread.currentThread().getName() + ">");
+		}
+	}
+
+	//--------------------------------------------------------------------------------
+	// Remove
+	//--------------------------------------------------------------------------------
+
+	// called on IM...
+	public Object remove(Object key, boolean returnOldValue) {
+		return put(key, null, true, returnOldValue); // a remove is a put(key, null)...
+	}
+
+	//--------------------------------------------------------------------------------
+	// StateManager
+	//--------------------------------------------------------------------------------
+
+	public Object syncRpc(Destination destination, Object message) throws Exception {
+		ObjectMessage tmp=_dispatcher.exchangeSendLoop(_dispatcher.getLocalDestination(), (Destination)destination, (Serializable)message, _timeout, 10);
+		Object response=null;
+		try {
+			response=tmp.getObject();
+		} catch (JMSException e) {
+		  _log.error("unexpected problem", e); // should be in loop - TODO
+		}
+		return response;
+	}
+
+//	public Object getLocalLocation() {
+//		return _dispatcher.getLocalDestination();
+//	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/LocalPartition.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/LocalPartition.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/LocalPartition.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/LocalPartition.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,81 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Destination;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.gridstate.PartitionConfig;
+import org.codehaus.wadi.gridstate.Partition;
+import org.codehaus.wadi.impl.Utils;
+
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.ReaderPreferenceReadWriteLock;
+
+public class LocalPartition implements Partition {
+
+	protected static final Log _log=LogFactory.getLog(LocalPartition.class);
+
+	protected final transient ReadWriteLock _lock;
+	protected Map _map=new HashMap();
+
+	protected PartitionConfig _config;
+
+	public LocalPartition() {
+		_lock=new ReaderPreferenceReadWriteLock();
+	}
+
+	public void init(PartitionConfig config) {
+		_config=config;
+	}
+
+	public Destination getDestination() {
+		return _config.getLocalDestination();
+	}
+
+	public Location getLocation(Object key) {
+		try {
+			Utils.safeAcquire(_lock.readLock());
+			return (Location)_map.get(key);
+		} finally {
+			_lock.readLock().release();
+		}
+	}
+
+	public ReadWriteLock getLock() {
+		return _lock;
+	}
+
+	public Map getMap() {
+		return _map;
+	}
+
+	// PMPartition API
+	
+	public boolean isLocal() {
+		return true;
+	}
+	
+	public int getKey() {
+		throw new UnsupportedOperationException("NYI");
+	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/Location.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/Location.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/Location.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/Location.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,53 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.impl;
+
+import java.io.Serializable;
+
+import javax.jms.Destination;
+
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.ReaderPreferenceReadWriteLock;
+
+
+public class Location implements Serializable {
+
+	protected transient ReadWriteLock _lock;
+	protected transient boolean _invalid;
+	protected Destination _destination;
+
+	public Location(Destination destination) {
+		_lock=new ReaderPreferenceReadWriteLock();
+		_destination=destination;
+	}
+
+	public ReadWriteLock getLock() {
+		return _lock;
+	}
+
+	public void invalidate() {
+		_invalid=true;
+	}
+
+	public Object getValue() {
+		return _destination;
+	}
+
+	public void setValue(Object destination) {
+		_destination=(Destination)destination;
+	}
+}
\ No newline at end of file

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/PartitionFacade.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/PartitionFacade.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/PartitionFacade.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/PartitionFacade.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,66 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.impl;
+
+import java.util.Map;
+
+import javax.jms.Destination;
+
+import org.codehaus.wadi.gridstate.PartitionConfig;
+import org.codehaus.wadi.gridstate.Partition;
+
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+
+public class PartitionFacade implements Partition {
+
+	protected final Partition _partition;
+
+	PartitionFacade(Partition partition) {
+		_partition=partition;
+	}
+
+	public void init(PartitionConfig config) {
+		_partition.init(config);
+	}
+
+	public Destination getDestination() {
+		return _partition.getDestination();
+	}
+
+	public Location getLocation(Object key) {
+		return _partition.getLocation(key);
+	}
+
+	public ReadWriteLock getLock() {
+		return _partition.getLock();
+	}
+
+	public Map getMap() {
+		return _partition.getMap();
+	}
+
+	// PMPartition API
+	
+	public boolean isLocal() {
+		return _partition.isLocal();
+	}
+	
+	public int getKey() {
+		throw new UnsupportedOperationException("NYI");
+	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/RemotePartition.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/RemotePartition.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/RemotePartition.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/RemotePartition.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,72 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.impl;
+
+import java.util.Map;
+
+import javax.jms.Destination;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.gridstate.PartitionConfig;
+import org.codehaus.wadi.gridstate.Partition;
+
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+
+public class RemotePartition implements Partition {
+
+	protected final Log _log=LogFactory.getLog(getClass().getName());
+	protected final Destination _destination;
+
+	public RemotePartition(Destination destination) {
+		_destination=destination;
+	}
+
+	protected PartitionConfig _config;
+
+	public void init(PartitionConfig config) {
+		_config=config;
+	}
+
+	public Destination getDestination() {
+		return _destination;
+	}
+
+	public Location getLocation(Object key) {
+		throw new UnsupportedOperationException("What should we do here?");
+	}
+
+	public ReadWriteLock getLock() {
+		throw new UnsupportedOperationException("What should we do here?");
+	}
+
+	public Map getMap() {
+		throw new UnsupportedOperationException("What should we do here?");
+	}
+
+
+	// PMPartition API
+	
+	public boolean isLocal() {
+		return false;
+	}
+	
+	public int getKey() {
+		throw new UnsupportedOperationException("NYI");
+	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/SmartLockManager.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/SmartLockManager.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/SmartLockManager.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/SmartLockManager.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,122 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.gridstate.LockManager;
+import org.codehaus.wadi.impl.Utils;
+
+import EDU.oswego.cs.dl.util.concurrent.Mutex;
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+
+// FIXME - DOES NOT WORK :-(
+
+/**
+ * Creates, reuses and destroys, on-the-fly, a lock for a given Key.
+ *
+ * @author jules
+ *
+ */
+public class SmartLockManager implements LockManager {
+
+	/**
+	 * Like a ManagedConnection in a JCA ConnectionPool... - but simpler :-)
+	 *
+	 */
+	class ManagedSync implements Sync {
+
+		protected final Object _key;
+		protected final Sync _sync;
+		protected int _count;
+
+		ManagedSync(Object key) {
+			_key=key;
+			_sync=new Mutex();
+			_count=0;
+		}
+
+		public void acquire() throws InterruptedException {
+			inc();
+			_sync.acquire();
+		}
+
+		public boolean attempt(long msecs) throws InterruptedException {
+			inc();
+			if(_sync.attempt(msecs)) {
+				return true;
+			} else {
+				dec();
+				return false;
+			}
+
+		}
+
+		public void release() {
+			_sync.release();
+			dec();
+		}
+
+		protected void inc() {
+			_count++;
+			//_log.info("inc: "+_key+" ->"+_count+" : "+Thread.currentThread().getName());
+		}
+
+		protected void dec() {
+			synchronized (_syncs) {
+				if (--_count==0) {
+					_syncs.remove(_key);
+					//_log.info("dec: "+_key+" ->"+_count+" : "+Thread.currentThread().getName());
+					//_log.info("destroyed: "+_key+" : "+Thread.currentThread().getName());
+				} else {
+					//_log.info("dec: "+_key+" ->"+_count+" : "+Thread.currentThread().getName());
+				}
+			}
+		}
+	}
+
+	protected final String _name;
+	//protected final Log _log;
+	protected final Map _syncs;
+
+	public SmartLockManager(String name) {
+		_name=name;
+		//_log=LogFactory.getLog(getClass().getName()+"#"+hashCode());
+		_syncs=new HashMap();
+	}
+
+	public Sync acquire(Object key) {
+		Sync sync;
+		synchronized (_syncs) {
+			if ((sync=(Sync)_syncs.get(key))==null) {
+				_syncs.put(key, (sync=new ManagedSync(key)));
+				//_log.info("created: "+key+" : "+Thread.currentThread().getName());
+			}
+			Utils.safeAcquire(sync);
+		}
+
+		return sync;
+	}
+
+	public Sync acquire(Object key, Object value) {
+		return acquire(key);
+	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/StaticPartitionManager.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/StaticPartitionManager.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/StaticPartitionManager.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/StaticPartitionManager.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,106 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.impl;
+
+import javax.jms.Destination;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.gridstate.Dispatcher;
+import org.codehaus.wadi.gridstate.PartitionManager;
+import org.codehaus.wadi.gridstate.PartitionManagerConfig;
+import org.codehaus.wadi.gridstate.PartitionMapper;
+
+public class StaticPartitionManager implements PartitionManager {
+
+	protected final Dispatcher _dispatcher;
+	protected final PartitionFacade[] _partitions;
+	protected final PartitionMapper _mapper;
+
+	public StaticPartitionManager(Dispatcher dispatcher, int numPartitions, PartitionMapper mapper) {
+		_dispatcher=dispatcher;
+		_partitions=new PartitionFacade[numPartitions];
+		_mapper=mapper;
+	}
+
+	public void init(PartitionManagerConfig config) {
+		for (int i=0; i<_partitions.length; i++) {
+			_partitions[i].init(this);
+		}
+	}
+
+	public PartitionFacade[] getPartitions() {
+		return _partitions;
+	}
+
+	public int getNumPartitions() {
+		return _partitions.length;
+	}
+
+    protected final static Log _log=LogFactory.getLog(StaticPartitionManager.class);
+
+    // MUST be called once to distribute the Partitions between Caches and initialise them correctly before they are used...
+	public static void partition(GCache[] caches, PartitionManager[] managers, int numPartitions) {
+		int numCaches=caches.length;
+    	// initialise the partitions...
+    	int partitionsPerCache=numPartitions/numCaches;
+    	for (int i=0; i<numPartitions; i++) {
+    		// figure out which node is Partition Master...
+    		int index=i/partitionsPerCache;
+    		GCache master=caches[index];
+            if (_log.isInfoEnabled()) _log.info("partition-" + i + " -> node-" + index);
+    		// go through all the nodes...
+    		for (int j=0; j<numCaches; j++) {
+    			GCache cache=caches[j];
+    			if (cache!=master) {
+    				// if node is not PartitionMaster - make partition remote, pointing to PartitionMaster
+    				PartitionFacade partition=new PartitionFacade(new RemotePartition(master.getLocalDestination()));
+    				partition.init(cache.getPartitionConfig());
+    				cache.getPartitions()[i]=partition;
+    			} else {
+    				PartitionFacade partition=new PartitionFacade(new LocalPartition());
+    				cache.getPartitions()[i]=partition;
+    			}
+    		}
+    	}
+	}
+
+	public PartitionFacade getPartition(Object key) {
+		return _partitions[_mapper.map(key)];
+	}
+
+	public void start() throws Exception {
+		// empty
+	}
+
+	public void stop() throws Exception {
+		// empty
+	}
+
+	public Dispatcher getDispatcher() {
+		return _dispatcher;
+	}
+
+	public Destination getLocalDestination() {
+		return _dispatcher.getLocalDestination();
+	}
+
+	public void evacuate() {
+		throw new UnsupportedOperationException("NYI");
+	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/StupidLockManager.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/StupidLockManager.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/StupidLockManager.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/impl/StupidLockManager.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,70 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.gridstate.LockManager;
+import org.codehaus.wadi.impl.Utils;
+
+import EDU.oswego.cs.dl.util.concurrent.Mutex;
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+
+/**
+ * Creates and reuses, on-the-fly, a lock for a given Key, but will never destroy the lock - temporary solution...
+ *
+ * @author jules
+ *
+ */
+public class StupidLockManager implements LockManager {
+
+	protected static final Log _log=LogFactory.getLog(StupidLockManager.class);
+
+	protected String _prefix;
+
+	public StupidLockManager(String prefix) {
+		_prefix=prefix;
+	}
+
+	protected Map _syncs=new HashMap(); // was a WeakHashMap, assocs removed as keys fall out of use... - good idea ?
+
+	/* (non-Javadoc)
+	 * @see org.codehaus.wadi.sandbox.gridstate.LockManagerAPI#acquire(java.lang.Object)
+	 */
+	public Sync acquire(Object key) {
+		return acquire(key, null);
+	}
+
+	public Sync acquire(Object key, Object value) {
+		// value not used - locks are always held externally...
+		Sync sync=null;
+		synchronized (_syncs) {
+			if ((sync=(Sync)_syncs.get(key))==null) {
+					_syncs.put(key, (sync=new Mutex()));
+                if (_log.isTraceEnabled()) _log.trace("[" + _prefix + "] created sync: " + key + " - " + this + " - " + sync);
+			}
+		}
+        if (_log.isTraceEnabled()) _log.trace("[" + _prefix + "] trying to acquire sync for: " + key + " - " + this + " - " + sync);
+		Utils.safeAcquire(sync);
+        if (_log.isTraceEnabled()) _log.trace("[" + _prefix + "] sync acquired for: " + key + " - " + this + " - " + sync);
+		return sync;
+	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/messages/ReadIMToPM.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/messages/ReadIMToPM.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/messages/ReadIMToPM.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/messages/ReadIMToPM.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,49 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.messages;
+
+import java.io.Serializable;
+
+import javax.jms.Destination;
+
+public class ReadIMToPM implements Serializable {
+
+	protected Object _key;
+	protected Destination _im;
+
+	public ReadIMToPM(Object key, Destination im) {
+		_key=key;
+		_im=im;
+	}
+
+	protected ReadIMToPM() {
+		// for deserialisation ...
+	}
+
+	public Object getKey() {
+		return _key;
+	}
+
+	public Destination getIM() {
+		return _im;
+	}
+	
+	public String toString() {
+		return "<ReadIMToPM:"+_key+">";
+	}
+	
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/messages/WriteIMToPM.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/messages/WriteIMToPM.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/messages/WriteIMToPM.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/messages/WriteIMToPM.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,63 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.messages;
+
+import java.io.Serializable;
+
+import javax.jms.Destination;
+
+public class WriteIMToPM implements Serializable {
+
+	protected Object _key;
+	protected boolean _valueIsNull;
+	protected boolean _overwrite;
+	protected boolean _returnOldValue;
+	protected Destination _im;
+
+	public WriteIMToPM(Object key, boolean valueIsNull, boolean overwrite, boolean returnOldValue, Destination im) {
+		_key=key;
+		_valueIsNull=valueIsNull;
+		_overwrite=overwrite;
+		_returnOldValue=returnOldValue;
+		_im=im;
+	}
+
+	protected WriteIMToPM() {
+		// for deserialisation...
+	}
+
+	public Object getKey() {
+		return _key;
+	}
+
+	public boolean getValueIsNull() {
+		return _valueIsNull;
+	}
+
+	public boolean getOverwrite() {
+		return _overwrite;
+	}
+
+	public boolean getReturnOldValue() {
+		return _returnOldValue;
+	}
+
+	public Destination getIM() {
+		return _im;
+	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/messages/WritePMToIM.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/messages/WritePMToIM.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/messages/WritePMToIM.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/gridstate/messages/WritePMToIM.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,37 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.gridstate.messages;
+
+import java.io.Serializable;
+
+public class WritePMToIM implements Serializable {
+
+	protected boolean _success;
+
+	public WritePMToIM(boolean success) {
+		_success=success;
+	}
+
+	protected WritePMToIM() {
+		// for deserialisation...
+	}
+
+	public boolean getSuccess() {
+		return _success;
+	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/http/HTTPProxiedLocation.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/http/HTTPProxiedLocation.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/http/HTTPProxiedLocation.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/http/HTTPProxiedLocation.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,37 @@
+/**
+ *
+ * Copyright 2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.http;
+
+import java.net.InetSocketAddress;
+
+import org.codehaus.wadi.ProxiedLocation;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class HTTPProxiedLocation implements ProxiedLocation {
+	private final InetSocketAddress address;
+	
+	public HTTPProxiedLocation(InetSocketAddress address) {
+		this.address = address;
+	}
+	
+	public InetSocketAddress getInetSocketAddress() {
+		return address;
+	}
+	
+}



Mime
View raw message