geronimo-scm mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgenen...@apache.org
Subject svn commit: r418140 - in /geronimo/sandbox/geronimo-cache: ./ src/main/java/org/apache/geronimo/cache/ src/main/java/org/apache/geronimo/cache/comm/ src/main/java/org/apache/geronimo/cache/comm/aioimpl/ src/main/java/org/apache/geronimo/cache/comm/amqi...
Date Thu, 29 Jun 2006 21:20:50 GMT
Author: jgenender
Date: Thu Jun 29 14:20:49 2006
New Revision: 418140

URL: http://svn.apache.org/viewvc?rev=418140&view=rev
Log:
GERONIMO-2046 JMS back end for clustering

Added:
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/CacheCommListener.java   (with props)
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/CacheChannelAMQImpl.java   (with props)
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/ClusterReceiverRunnable.java   (with props)
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/ClusterSenderRunnable.java   (with props)
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/CacheChannelUDPImpl.java   (with props)
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/ClusterReceiverRunnable.java   (with props)
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/ClusterSenderRunnable.java   (with props)
    geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/amqimpl/
    geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/amqimpl/AMQImplChannelTest.java   (with props)
Removed:
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/impl/
Modified:
    geronimo/sandbox/geronimo-cache/pom.xml
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/CacheManager.java
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/CacheChannel.java
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/JoinedEvent.java
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/MessageID.java
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/ProvideContentEvent.java
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/RemoteEvent.java
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/aioimpl/CacheActiveIOCommListener.java
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/aioimpl/CacheChannelActiveIOImpl.java
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/impl/LocalCacheImpl.java
    geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/impl/ReplicatedCacheImpl.java
    geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/MockCacheChannel.java
    geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/RunCacheStuff.java

Modified: geronimo/sandbox/geronimo-cache/pom.xml
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/pom.xml?rev=418140&r1=418139&r2=418140&view=diff
==============================================================================
--- geronimo/sandbox/geronimo-cache/pom.xml (original)
+++ geronimo/sandbox/geronimo-cache/pom.xml Thu Jun 29 14:20:49 2006
@@ -1,3 +1,4 @@
+<?xml version="1.0"?>
 <!-- 
     * Copyright 2005-2006 The Apache Software Foundation.
     *
@@ -21,7 +22,6 @@
     <name>Caching</name>
     <packaging>jar</packaging>
     <description>replicated cache session implementation</description>
-    
     <prerequisites>
         <maven>2.0.4</maven>
     </prerequisites>
@@ -83,7 +83,25 @@
         <dependency>
             <groupId>incubator-activemq</groupId>
             <artifactId>activeio-core</artifactId>
-            <version>3.0-beta3</version>
+            <version>3.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>incubator-activemq</groupId>
+            <artifactId>activemq-core</artifactId>
+            <version>4.1-SNAPSHOT</version>
+        </dependency>
+	<!-- derby and management should be picked up transitively 
+	  but they are not -->
+        <dependency>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derby</artifactId>
+            <version>10.1.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-j2ee-management_1.0_spec</artifactId>
+            <version>1.0.1</version>
+            <optional>true</optional>
         </dependency>
     </dependencies>
     <build>
@@ -130,4 +148,4 @@
             </plugin>
         </plugins>
     </reporting>
-</project>
\ No newline at end of file
+</project>

Modified: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/CacheManager.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/CacheManager.java?rev=418140&r1=418139&r2=418140&view=diff
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/CacheManager.java (original)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/CacheManager.java Thu Jun 29 14:20:49 2006
@@ -26,7 +26,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.geronimo.cache.comm.impl.CacheChannelUDPImpl;
+import org.apache.geronimo.cache.comm.udpimpl.CacheChannelUDPImpl;
 import org.apache.geronimo.cache.config.CacheConfig;
 import org.apache.geronimo.cache.config.CacheStorageConfig;
 import org.apache.geronimo.cache.xmlbeans.GCacheConfigDocument;
@@ -58,10 +58,18 @@
 			log.error("could not parse configuration", e);
 			throw new CacheConfigurationException(
 					"could not parse configuration", e);
+		} catch (Exception e) {
+			log.error("error parsing configuration", e);
+			throw new CacheConfigurationException(
+					"error parsing configuration", e);
 		}
 	}
 
 	public CacheManager(ClassLoader loader) {
+		if (loader == null) {
+			throw new CacheConfigurationException(
+					"ClassLoader must not be null");
+		}
 		classLoader = loader;
 		try {
 			parseConfiguration();
@@ -99,14 +107,18 @@
 			if (file.exists()) {
 				stream = new FileInputStream(file);
 			} else {
-				URL configURL = Thread.currentThread().getContextClassLoader()
-						.getResource(overridePath);
+				URL configURL = classLoader.getResource(overridePath);
 				stream = configURL.openStream();
 			}
 		} else {
-			// use the classpath
-			URL configURL = Thread.currentThread().getContextClassLoader()
+			// use the classloader specified
+			URL configURL = classLoader
 					.getResource("META-INF/geronimo-cache-config.xml");
+			if (null == configURL) {
+				throw new CacheConfigurationException(
+						"geronimo-cache-config.xml (the default config file) could not be found, "
+								+ "please ensure your classpath includes the cache jar file");
+			}
 			stream = configURL.openStream();
 		}
 		parseConfiguration(stream);
@@ -128,10 +140,14 @@
 		Iterator itr = cacheConfigs.keySet().iterator();
 		while (itr.hasNext()) {
 			String key = (String) itr.next();
-			CacheConfig config = (CacheConfig) cacheConfigs.get(key);
+			Object testConfig = cacheConfigs.get(key);
+			assertValidConfig(testConfig, key);
+			CacheConfig config = (CacheConfig) testConfig;
 			try {
 				Class cacheImpl = Class.forName(config.getCacheImpl(), true,
 						classLoader);
+				// TODO: if Cache and the impl come from different class loaders
+				// this will end up in a class cast exception
 				Cache cache = (Cache) cacheImpl.newInstance();
 				cache.setConfiguration(config);
 				caches.put(config.getName(), cache);
@@ -152,6 +168,26 @@
 				log.error(msg, e);
 				throw new CacheConfigurationException(msg, e);
 			}
+		}
+	}
+
+	private void assertValidConfig(Object testConfig, String key) {
+		if (!(testConfig instanceof CacheConfig)) {
+			throw new CacheConfigurationException("Configuration for " + key
+					+ " is not a valid configuration type");
+		}
+		CacheConfig config = (CacheConfig) testConfig;
+		if (null == config) {
+			throw new CacheConfigurationException("Configuration for " + key
+					+ " is null and must not be");
+		}
+		if (null == config.getCacheImpl()) {
+			throw new CacheConfigurationException("Implementation class for "
+					+ key + " is null and must not be");
+		}
+		if (null == config.getName() || config.getName().length() == 0) {
+			throw new CacheConfigurationException("Cache name is empty for "
+					+ key + " and must not be");
 		}
 	}
 

Modified: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/CacheChannel.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/CacheChannel.java?rev=418140&r1=418139&r2=418140&view=diff
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/CacheChannel.java (original)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/CacheChannel.java Thu Jun 29 14:20:49 2006
@@ -42,7 +42,7 @@
 	 * This method blocks until the update is received.
 	 * @param update
 	 */
-	public void recieve(RemoteEvent update);
+	public void receive(RemoteEvent update);
 	/**
 	 * close the channel
 	 */

Added: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/CacheCommListener.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/CacheCommListener.java?rev=418140&view=auto
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/CacheCommListener.java (added)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/CacheCommListener.java Thu Jun 29 14:20:49 2006
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.cache.comm;
+
+import org.apache.geronimo.cache.CacheEvent;
+import org.apache.geronimo.cache.CacheException;
+import org.apache.geronimo.cache.CacheListener;
+
+/**
+ * This listener is added to the replicated cache and is what sends out the
+ * changes messages to the rest of the cluster.
+ *
+ * $Rev$ $Date$
+ */
+public class CacheCommListener implements CacheListener {
+	private CacheChannel channel = null;
+
+	public CacheCommListener(CacheChannel channel) {
+		if(channel == null) {
+			throw new CacheException("channel must not be null");
+		}
+		this.channel = channel;
+	}
+
+	public void entryAdded(CacheEvent event) {
+		RemoteAddEvent addEvent = new RemoteAddEvent();
+		addEvent.setCacheName(event.getCache().getName());
+		addEvent.setCacheId(event.getCache().getId());
+		addEvent.setKey(event.getKey());
+		addEvent.setNewValue(event.getNewValue());
+		channel.send(addEvent);
+	}
+
+	public void entryRemoved(CacheEvent event) {
+		RemoteRemoveEvent removeEvent = new RemoteRemoveEvent();
+		removeEvent.setCacheName(event.getCache().getName());
+		removeEvent.setCacheId(event.getCache().getId());
+		removeEvent.setKey(event.getKey());
+		removeEvent.setOldValue(event.getOldValue());
+		channel.send(removeEvent);
+	}
+
+	public void entryUpdated(CacheEvent event) {
+		RemoteUpdateEvent updateEvent = new RemoteUpdateEvent();
+		updateEvent.setCacheName(event.getCache().getName());
+		updateEvent.setCacheId(event.getCache().getId());
+		updateEvent.setKey(event.getKey());
+		updateEvent.setNewValue(event.getNewValue());
+		updateEvent.setOldValue(event.getOldValue());
+		channel.send(updateEvent);
+	}
+
+}

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/CacheCommListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/CacheCommListener.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/CacheCommListener.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/JoinedEvent.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/JoinedEvent.java?rev=418140&r1=418139&r2=418140&view=diff
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/JoinedEvent.java (original)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/JoinedEvent.java Thu Jun 29 14:20:49 2006
@@ -15,8 +15,7 @@
  */
 package org.apache.geronimo.cache.comm;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.cache.CacheException;
 import org.apache.geronimo.cache.ReplicatedCache;
 
 /**
@@ -33,11 +32,12 @@
 public class JoinedEvent extends RemoteEvent {
 	private static final long serialVersionUID = 1L;
 
-	private Log log = LogFactory.getLog(JoinedEvent.class);
-
 	public void invoke(ReplicatedCache cache) {
-		if (log.isDebugEnabled()) {
-			log.debug("processing a joined event for " + getCacheId());
+		if(cache == null) {
+			throw new CacheException("Invalid invocation of a JoinedEvent: cache may not be null");
+		}
+		if (getLog().isDebugEnabled()) {
+            getLog().debug("processing a joined event for " + getCacheId());
 		}
 		// when the joined event is sent everyone respondes with "i've got it"
 		ProvideContentEvent event = new ProvideContentEvent();
@@ -47,5 +47,5 @@
 		event.setValues(cache.getAll(cache.keySet()));
 		cache.getChannel().send(event);
 	}
-
+    
 }

Modified: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/MessageID.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/MessageID.java?rev=418140&r1=418139&r2=418140&view=diff
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/MessageID.java (original)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/MessageID.java Thu Jun 29 14:20:49 2006
@@ -40,7 +40,7 @@
 public class MessageID {
 	private static final int HASH_LENTGH = 8;
 
-	private Log log = LogFactory.getLog(MessageID.class);
+	private transient Log log = LogFactory.getLog(MessageID.class);
 
 	private UID uniqueIdentifier = new UID();
 
@@ -87,6 +87,9 @@
 	public static MessageID read(DataInput in) throws IOException {
 		UID uid = UID.read(in);
 		int length = in.readInt();
+		if(length <= 0) {
+			throw new CacheException("The length " + length + " is invalid");
+		}
 		byte data[] = new byte[length];
 		for (int i = 0; i < length; i++) {
 			data[i] = in.readByte();

Modified: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/ProvideContentEvent.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/ProvideContentEvent.java?rev=418140&r1=418139&r2=418140&view=diff
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/ProvideContentEvent.java (original)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/ProvideContentEvent.java Thu Jun 29 14:20:49 2006
@@ -17,9 +17,6 @@
 
 import java.rmi.server.UID;
 import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.geronimo.cache.ReplicatedCache;
 
 /**
@@ -31,8 +28,6 @@
 public class ProvideContentEvent extends RemoteEvent {
 	private static final long serialVersionUID = 1L;
 
-	private Log log = LogFactory.getLog(ProvideContentEvent.class);
-
 	private Map values = null;
 
 	private UID destinationCacheId = null;
@@ -54,14 +49,13 @@
 	}
 
 	public void invoke(ReplicatedCache cache) {
-		if (log.isDebugEnabled()) {
-			log.debug("processing a provide content event for "
+		if (getLog().isDebugEnabled()) {
+			getLog().debug("processing a provide content event for "
 					+ getDestinationCacheId());
 		}
 		if (cache.getId().equals(this.getDestinationCacheId())) {
-			if (log.isDebugEnabled()) {
-				log.debug("provide content event - origin = " + getOrigin()
-						+ " source id = " + getCacheId());
+			if (getLog().isDebugEnabled()) {
+                getLog().debug("provide content event - source id = " + getCacheId());
 			}
 			cache.getChannel().setSendEvents(false);
 			cache.putAll(getValues());

Modified: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/RemoteEvent.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/RemoteEvent.java?rev=418140&r1=418139&r2=418140&view=diff
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/RemoteEvent.java (original)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/RemoteEvent.java Thu Jun 29 14:20:49 2006
@@ -16,60 +16,50 @@
 package org.apache.geronimo.cache.comm;
 
 import java.io.Serializable;
-import java.net.SocketAddress;
 import java.rmi.server.UID;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.geronimo.cache.ReplicatedCache;
 
 /**
- * First cut at remote event propigation. This will be replaced with a protocol insted of sending
- * serialized objects over the net.
- *
+ * First cut at remote event propagation. This will be replaced with a protocol
+ * insted of sending serialized objects over the net.
+ * 
  * $Rev$ $Date$
  */
 public abstract class RemoteEvent implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private Object key = null;
-
-	private String cacheName = null;
-	
-	private UID cacheId = null;
-	
-	private SocketAddress origin = null;
-
-	public String getCacheName() {
-		return cacheName;
-	}
-
-	public void setCacheName(String cacheName) {
-		this.cacheName = cacheName;
-	}
-
-	public UID getCacheId() {
-		return cacheId;
-	}
-
-	public void setCacheId(UID cacheId) {
-		this.cacheId = cacheId;
-	}
-
-	public Object getKey() {
-		return key;
-	}
-
-	public void setKey(Object key) {
-		this.key = key;
-	}
-	
-	public SocketAddress getOrigin() {
-		return origin;
-	}
-
-	public void setOrigin(SocketAddress origin) {
-		this.origin = origin;
-	}
-
-	public abstract void invoke(ReplicatedCache cache);
+    private static final long serialVersionUID = 1L;
+    private Object key = null;
+    private String cacheName = null;
+    private UID cacheId = null;
+    private transient Log log = null;
+    public String getCacheName() {
+        return cacheName;
+    }
+
+    public void setCacheName(String cacheName) {
+        this.cacheName = cacheName;
+    }
+
+    public UID getCacheId() {
+        return cacheId;
+    }
+
+    public void setCacheId(UID cacheId) {
+        this.cacheId = cacheId;
+    }
+
+    public Object getKey() {
+        return key;
+    }
+
+    public void setKey(Object key) {
+        this.key = key;
+    }
+
+    protected Log getLog() {
+        return log == null ? log = LogFactory.getLog(this.getClass()) : log;
+    }
 
+    public abstract void invoke(ReplicatedCache cache);
 }

Modified: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/aioimpl/CacheActiveIOCommListener.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/aioimpl/CacheActiveIOCommListener.java?rev=418140&r1=418139&r2=418140&view=diff
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/aioimpl/CacheActiveIOCommListener.java (original)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/aioimpl/CacheActiveIOCommListener.java Thu Jun 29 14:20:49 2006
@@ -16,6 +16,7 @@
 package org.apache.geronimo.cache.comm.aioimpl;
 
 import org.apache.geronimo.cache.CacheEvent;
+import org.apache.geronimo.cache.CacheException;
 import org.apache.geronimo.cache.CacheListener;
 import org.apache.geronimo.cache.comm.CacheChannel;
 import org.apache.geronimo.cache.comm.RemoteAddEvent;
@@ -33,6 +34,9 @@
 	private CacheChannel channel = null;
 
 	public CacheActiveIOCommListener(CacheChannel channel) {
+		if(channel == null) {
+			throw new CacheException("channel must not be null");
+		}
 		this.channel = channel;
 	}
 

Modified: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/aioimpl/CacheChannelActiveIOImpl.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/aioimpl/CacheChannelActiveIOImpl.java?rev=418140&r1=418139&r2=418140&view=diff
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/aioimpl/CacheChannelActiveIOImpl.java (original)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/aioimpl/CacheChannelActiveIOImpl.java Thu Jun 29 14:20:49 2006
@@ -37,7 +37,7 @@
 import org.apache.geronimo.cache.comm.CacheCommunicationException;
 import org.apache.geronimo.cache.comm.JoinedEvent;
 import org.apache.geronimo.cache.comm.RemoteEvent;
-import org.apache.geronimo.cache.comm.impl.CacheChannelUDPImpl;
+import org.apache.geronimo.cache.comm.udpimpl.CacheChannelUDPImpl;
 
 /**
  * First cut at the aio impl, does not work
@@ -152,7 +152,7 @@
 		}
 	}
 
-	public void recieve(RemoteEvent event) {
+	public void receive(RemoteEvent event) {
 		if (log.isDebugEnabled()) {
 			log.debug("Recieved update for " + event.getCacheId());
 		}

Added: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/CacheChannelAMQImpl.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/CacheChannelAMQImpl.java?rev=418140&view=auto
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/CacheChannelAMQImpl.java (added)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/CacheChannelAMQImpl.java Thu Jun 29 14:20:49 2006
@@ -0,0 +1,121 @@
+package org.apache.geronimo.cache.comm.amqimpl;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.cache.CacheException;
+import org.apache.geronimo.cache.ReplicatedCache;
+import org.apache.geronimo.cache.comm.CacheChannel;
+import org.apache.geronimo.cache.comm.CacheCommunicationException;
+import org.apache.geronimo.cache.comm.JoinedEvent;
+import org.apache.geronimo.cache.comm.RemoteEvent;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+
+public class CacheChannelAMQImpl implements CacheChannel {
+    private transient Log log = LogFactory.getLog(CacheChannelAMQImpl.class);
+
+    private boolean awake = false;
+    private ThreadLocal sendEvents = new ThreadLocal();
+    private ReplicatedCache cache = null;
+    private String protocol = "vm";
+    private String hostName = "localhost";
+    private String port = "61616";
+    private boolean transacted = false;
+    private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+    private String topicName = "TEST.FOO";
+    private Session session = null;
+    private Connection connection = null;
+    private Destination destination = null;
+    private ClusterReceiverRunnable receiver = null;
+    private ClusterSenderRunnable sender = null;
+    private LinkedBlockingQueue queue = new LinkedBlockingQueue();
+    private ConnectionFactory connectionFactory = null;
+
+    public CacheChannelAMQImpl() {
+        super();
+        configure();
+    }
+
+    public ReplicatedCache getCache() {
+        return cache;
+    }
+
+    public void setCache(ReplicatedCache cache) {
+        if (!awake) {
+            this.cache = cache;
+            try {
+                receiver = new ClusterReceiverRunnable(cache, "receiver", session,
+                        destination);
+            } catch (JMSException e) {
+                throw new CacheCommunicationException("Could not create the receiver thread", e);
+            }
+            sender = new ClusterSenderRunnable("sender", session, destination,
+                    queue);
+            receiver.start();
+            sender.start();
+            awake = true;
+            RemoteEvent event = new JoinedEvent();
+            event.setCacheId(cache.getId());
+            event.setCacheName(cache.getName());
+            if (log.isDebugEnabled()) {
+                log.debug("joining " + cache.getId() + " to the cluster");
+            }
+            send(event);
+        } else {
+            // should we throw here?
+        }
+    }
+
+    public void send(RemoteEvent update) {
+        try {
+            queue.put(update);
+        } catch (InterruptedException e) {
+            throw new CacheException("Failed to put an event onto a cache", e);
+        }
+    }
+
+    public void receive(RemoteEvent update) {
+        update.invoke(cache);
+    }
+
+    public void close() {
+        awake = false;
+        try {
+            cleanup();
+        } catch (JMSException e) {
+            log.error("Could not cleanup the channel", e);
+        }
+    }
+
+    public void setSendEvents(boolean flag) {
+        this.sendEvents.set(new Boolean(flag));
+    }
+
+    private void configure() {
+        // Create a ConnectionFactory
+        connectionFactory = new ActiveMQConnectionFactory(protocol + "://"
+                + hostName); // + ":" + port);
+        // Create a Connection
+        try {
+            connection = connectionFactory.createConnection();
+            connection.start();
+            // Create a Session
+            session = connection.createSession(transacted, acknowledgeMode);
+            // Create the destination (Topic or Queue)
+            destination = session.createTopic(topicName);
+        } catch (JMSException e) {
+            throw new CacheCommunicationException("Could not configure the cahnnel", e);
+        }
+    }
+
+    private void cleanup() throws JMSException {
+        // Clean up
+        session.close();
+        connection.close();
+    }
+}

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/CacheChannelAMQImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/CacheChannelAMQImpl.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/CacheChannelAMQImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/ClusterReceiverRunnable.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/ClusterReceiverRunnable.java?rev=418140&view=auto
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/ClusterReceiverRunnable.java (added)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/ClusterReceiverRunnable.java Thu Jun 29 14:20:49 2006
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.cache.comm.amqimpl;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.cache.ReplicatedCache;
+import org.apache.geronimo.cache.comm.CacheCommunicationException;
+import org.apache.geronimo.cache.comm.RemoteEvent;
+
+/**
+ * Reads data from the cluster and posts it locally to the cache so the local
+ * cache will be in sync with the remote sender.
+ * 
+ * $Rev$ $Date$
+ */
+class ClusterReceiverRunnable extends Thread {
+    public Log log = LogFactory.getLog(ClusterReceiverRunnable.class);
+    private ReplicatedCache cache = null;
+    private boolean finished = false;
+    private MessageConsumer consumer = null;
+
+    public ClusterReceiverRunnable(ReplicatedCache cache, String name,
+            Session session, Destination destination) throws JMSException {
+        super(name);
+        this.cache = cache;
+        consumer = session.createConsumer(destination);
+    }
+
+    /**
+     * This is ugly - make a better way
+     */
+    public void finished() {
+        finished = true;
+        try {
+            consumer.close();
+        } catch (JMSException e) {
+            log.error("Failed to close the MessageConsumer", e);
+        }
+    }
+
+    public void run() {
+        while (!finished) {
+            try {
+                // wait for a message
+                Message message = consumer.receive();
+                if (message instanceof ObjectMessage) {
+                    ObjectMessage objectMessage = (ObjectMessage) message;
+                    RemoteEvent event = (RemoteEvent) objectMessage.getObject();
+                    if (!cache.getId().equals(event.getCacheId())) {
+                        cache.getChannel().receive(event);
+                    } else {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Ignored receipt for " + cache.getId());
+                        }
+                    }
+                }
+            } catch (JMSException e) {
+                throw new CacheCommunicationException(
+                        "Failed to Receive a JMS message", e);
+            }
+        }
+    }
+}

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/ClusterReceiverRunnable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/ClusterReceiverRunnable.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/ClusterReceiverRunnable.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/ClusterSenderRunnable.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/ClusterSenderRunnable.java?rev=418140&view=auto
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/ClusterSenderRunnable.java (added)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/ClusterSenderRunnable.java Thu Jun 29 14:20:49 2006
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.cache.comm.amqimpl;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.cache.comm.CacheCommunicationException;
+import org.apache.geronimo.cache.comm.RemoteEvent;
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Reads data from the source side of a pipe and writes it to the cluster.
+ * 
+ * $Rev$ $Date$
+ */
+class ClusterSenderRunnable extends Thread {
+    public Log log = LogFactory.getLog(ClusterSenderRunnable.class);
+    private MessageProducer producer;
+    private Session session = null;
+    private boolean finished = false;
+    private LinkedBlockingQueue queue;
+    
+    public ClusterSenderRunnable(String name, Session session,
+            Destination destination, LinkedBlockingQueue queue) {
+        super(name);
+        try {
+            this.session = session;
+            this.queue = queue;
+            producer = session.createProducer(destination);
+        } catch (JMSException e) {
+            throw new CacheCommunicationException(
+                    "Failed to create a producer", e);
+        }
+    }
+
+    public void finished() {
+        finished = true;
+        try {
+            producer.close();
+        } catch (JMSException e) {
+            log.error("Failed to close producer", e);
+        }
+    }
+
+    public void run() {
+        while (!finished) {
+            RemoteEvent event;
+            try {
+                event = (RemoteEvent) queue.take();
+                ObjectMessage msg = session.createObjectMessage(event);
+                producer.send(msg);
+            } catch (InterruptedException e) {
+                throw new CacheCommunicationException("Failed to take an event from the queue", e);
+            } catch (JMSException e) {
+                throw new CacheCommunicationException("Failed to send the event message", e);
+            }
+        }
+    }
+}

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/ClusterSenderRunnable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/ClusterSenderRunnable.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/amqimpl/ClusterSenderRunnable.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/CacheChannelUDPImpl.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/CacheChannelUDPImpl.java?rev=418140&view=auto
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/CacheChannelUDPImpl.java (added)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/CacheChannelUDPImpl.java Thu Jun 29 14:20:49 2006
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.cache.comm.udpimpl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Pipe;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.cache.CacheException;
+import org.apache.geronimo.cache.ReplicatedCache;
+import org.apache.geronimo.cache.comm.CacheChannel;
+import org.apache.geronimo.cache.comm.CacheCommunicationException;
+import org.apache.geronimo.cache.comm.JoinedEvent;
+import org.apache.geronimo.cache.comm.RemoteEvent;
+
+/**
+ * UDP impl, works for small object changes
+ * 
+ * $Rev$ $Date$
+ */
+public class CacheChannelUDPImpl implements CacheChannel {
+    public static final int UDP_MAX_PACKET_SIZE = 65507;
+    public static final String DEFAULT_GROUP = "230.2.2.1";
+    public static final int DEFAULT_PORT = 10057;
+    private Log log = LogFactory.getLog(CacheChannelUDPImpl.class);
+    private ReplicatedCache cache = null;
+    private Pipe eventPipe = null;
+    private boolean awake = false;
+    private ClusterSenderRunnable sender;
+    private ClusterReceiverRunnable receiver;
+    private ThreadLocal sendEvents = new ThreadLocal();
+    private String groupName;
+    private int port;
+
+    public CacheChannelUDPImpl() {
+        try {
+            eventPipe = Pipe.open();
+        } catch (IOException e) {
+            String msg = "Could not open pipe for " + cache.getId();
+            log.error(msg, e);
+            throw new CacheException(msg, e);
+        } catch (RuntimeException e) {
+            System.err.println("e = " + e.getMessage());
+            throw e;
+        }
+    }
+
+    public void setSendEvents(boolean shouldSendEvents) {
+        this.sendEvents.set(new Boolean(shouldSendEvents));
+    }
+
+    /*
+     * puts the update object into the pipe that connects the local cache to the
+     * thread waiting to send out the change
+     * 
+     * @see org.apache.geronimo.cache.comm.CacheChannel#send(org.apache.geronimo.cache.comm.RemoteEvent)
+     * 
+     */
+    public void send(RemoteEvent update) {
+        Boolean value = (Boolean) sendEvents.get();
+        boolean flag = true;
+        if (null == value) {
+            sendEvents.set(new Boolean(true));
+        } else {
+            flag = value.booleanValue();
+        }
+        if (flag) {
+            ByteArrayOutputStream out = new ByteArrayOutputStream(256);
+            try {
+                ObjectOutputStream stream = new ObjectOutputStream(out);
+                stream.writeObject(update);
+                stream.flush();
+            } catch (IOException e) {
+                String msg = "Could not write the event "
+                        + update.getCacheName() + " into the buffer";
+                log.error(msg, e);
+                throw new CacheCommunicationException(msg, e);
+            }
+            try {
+                byte data[] = out.toByteArray();
+                out.close();
+                ByteBuffer src = ByteBuffer.wrap(data);
+                eventPipe.sink().write(src);
+                src.clear();
+            } catch (IOException e) {
+                String msg = "Could not write the event for cache "
+                        + update.getCacheName();
+                log.error(msg, e);
+                throw new CacheCommunicationException(msg, e);
+            }
+        }
+    }
+
+    /*
+     * A remote peer has been updated, post the change here (non-Javadoc)
+     * 
+     * @see org.apache.geronimo.cache.comm.CacheChannel#recieve(org.apache.geronimo.cache.comm.RemoteEvent)
+     */
+    public void receive(RemoteEvent event) {
+        if (log.isDebugEnabled()) {
+            log.debug("Recieved update for " + event.getCacheId());
+        }
+        event.invoke(cache);
+    }
+
+    public void close() {
+        try {
+            sender.finished();
+            receiver.finished();
+            eventPipe.sink().close();
+            eventPipe.source().close();
+            awake = false;
+            sender = null;
+            receiver = null;
+            eventPipe = null;
+        } catch (IOException e) {
+            String msg = "Could not close channel for " + cache.getId();
+            log.error(msg, e);
+            throw new CacheException(msg, e);
+        }
+    }
+
+    public ReplicatedCache getCache() {
+        return cache;
+    }
+
+    public void setCache(ReplicatedCache cache) {
+        if (!awake) {
+            this.cache = cache;
+            sender = new ClusterSenderRunnable(eventPipe.source(), "sender");
+            receiver = new ClusterReceiverRunnable(cache, "receiver");
+            receiver.start();
+            sender.start();
+            awake = true;
+            RemoteEvent event = new JoinedEvent();
+            event.setCacheId(cache.getId());
+            event.setCacheName(cache.getName());
+            if (log.isDebugEnabled()) {
+                log.debug("joining " + cache.getId() + " to the cluster");
+            }
+            send(event);
+        } else {
+            // TODO: should this throw an exception?
+        }
+    }
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+    public void setGroupName(String hostName) {
+        this.groupName = hostName;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+}

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/CacheChannelUDPImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/CacheChannelUDPImpl.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/CacheChannelUDPImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/ClusterReceiverRunnable.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/ClusterReceiverRunnable.java?rev=418140&view=auto
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/ClusterReceiverRunnable.java (added)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/ClusterReceiverRunnable.java Thu Jun 29 14:20:49 2006
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.cache.comm.udpimpl;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.cache.ReplicatedCache;
+import org.apache.geronimo.cache.comm.CacheCommunicationException;
+import org.apache.geronimo.cache.comm.RemoteEvent;
+
+/**
+ * Reads data from the cluster and posts it locally to the cache so it can be in
+ * sync with the remote sender.
+ * 
+ * $Rev$ $Date$
+ */
+class ClusterReceiverRunnable extends Thread {
+	public Log log = LogFactory.getLog(ClusterReceiverRunnable.class);
+
+	private ReplicatedCache cache = null;
+
+	private boolean finished = false;
+
+	private MulticastSocket socket = null;
+
+	public ClusterReceiverRunnable(ReplicatedCache cache, String name) {
+		super(name);
+		this.cache = cache;
+	}
+
+	/**
+	 * This is ugly - make a better way
+	 */
+	public void finished() {
+		finished = true;
+		if (socket != null) {
+			socket.close();
+		}
+	}
+
+	/**
+	 * @throws 
+	 */
+	public void run() {
+		String host = CacheChannelUDPImpl.DEFAULT_GROUP;
+		int port = CacheChannelUDPImpl.DEFAULT_PORT; 
+		try {
+			InetAddress group = InetAddress.getByName(host);
+			socket = new MulticastSocket(port);
+			socket.joinGroup(group);
+		} catch (UnknownHostException e) {
+			String msg = "Could not find host " + host + ":" + port;
+			log.error(msg, e);
+			throw new CacheCommunicationException(msg, e);
+		} catch (IOException e) {
+			String msg = "Host " + host + " is not a multi-cast address";
+			log.error(msg, e);
+			throw new CacheCommunicationException(msg, e);
+		}
+		while (!finished) {
+			try {
+				// read the data
+				DatagramPacket recv = receiveDatagram();
+				// make it into an object
+				RemoteEvent event = convertToObject(recv);
+				// receive the event - but not on the cache that sent it
+				if (!cache.getId().equals(event.getCacheId())) {
+					cache.getChannel().receive((RemoteEvent) event);
+				} else {
+					if (log.isDebugEnabled()) {
+						log.debug("Ignored receipt for " + cache.getId());
+					}
+				}
+			} catch (SocketException e) {
+				if (!finished) {
+					String msg = "Error in underlying protocol";
+					log.error(msg, e);
+					throw new CacheCommunicationException(msg, e);
+				}
+				// if we are finished then we can ignore the exception
+			} catch (IOException e) {
+				String msg = "Error reading datagram";
+				log.error(msg, e);
+				throw new CacheCommunicationException(msg, e);
+			} catch (ClassNotFoundException e) {
+				String msg = "Error instanciating RemoteEvent";
+				log.error(msg, e);
+				throw new CacheCommunicationException(msg, e);
+			}
+		}
+
+	}
+
+	private DatagramPacket receiveDatagram() throws IOException {
+		byte[] buf = new byte[CacheChannelUDPImpl.UDP_MAX_PACKET_SIZE];
+		DatagramPacket recv = new DatagramPacket(buf, buf.length);
+		socket.receive(recv);
+		return recv;
+	}
+
+	private RemoteEvent convertToObject(DatagramPacket recv)
+			throws IOException, ClassNotFoundException {
+		ByteArrayInputStream in = new ByteArrayInputStream(recv.getData());
+		ObjectInputStream input = new ObjectInputStream(in);
+		RemoteEvent value = (RemoteEvent) input.readObject();
+		//value.setOrigin(recv.getSocketAddress());
+		return value;
+	}
+
+}

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/ClusterReceiverRunnable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/ClusterReceiverRunnable.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/ClusterReceiverRunnable.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/ClusterSenderRunnable.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/ClusterSenderRunnable.java?rev=418140&view=auto
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/ClusterSenderRunnable.java (added)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/ClusterSenderRunnable.java Thu Jun 29 14:20:49 2006
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.cache.comm.udpimpl;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.Pipe;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.Pipe.SourceChannel;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.cache.CacheConfigurationException;
+
+/**
+ * Reads data from the source side of a pipe and writes it to the cluster.
+ * 
+ * The local cluster writes events into the SinkChannel of the pipe, this
+ * runnable is blocked waiting for that write. Once the data is read it is
+ * written out to the cluster to be picked up and applied to the local caches of
+ * the various elements of the cluster.
+ * 
+ * $Rev$ $Date$
+ */
+class ClusterSenderRunnable extends Thread {
+	public Log log = LogFactory.getLog(ClusterSenderRunnable.class);
+
+	private Pipe.SourceChannel pipeSource;
+
+	private boolean finished = false;
+
+	public ClusterSenderRunnable(Pipe.SourceChannel channel, String name) {
+		super(name);
+		this.pipeSource = channel;
+		try {
+			pipeSource.configureBlocking(false);
+		} catch (IOException e) {
+			String msg = "Could not mark pipe source as non-blocking";
+			log.error(msg, e);
+			throw new CacheConfigurationException(msg, e);
+		}
+	}
+
+	public void finished() {
+		finished = true;
+	}
+
+	public void run() {
+		try {
+			Selector selector = null;
+			selector = Selector.open();
+			pipeSource.register(selector, SelectionKey.OP_READ);
+			ByteBuffer buffer = ByteBuffer
+					.allocate(CacheChannelUDPImpl.UDP_MAX_PACKET_SIZE);
+			while (!finished) {
+				getDataFromPipeAndSendEvent(buffer, selector);
+			}
+		} catch (ClosedChannelException e1) {
+			// TODO Auto-generated catch block
+			e1.printStackTrace();
+		} catch (SocketException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+
+	private void getDataFromPipeAndSendEvent(ByteBuffer buffer,
+			Selector selector) throws IOException, UnknownHostException {
+		buffer.clear();
+		// TODO: before continuing need to see if we have read all the
+		// data
+		// this blocks for 250milliseconds waiting for data to be written
+		// on the sink side of the pipe
+		int selectedCount = selector.select(250);
+		if (selectedCount != 0) {
+			InetAddress group = InetAddress
+					.getByName(CacheChannelUDPImpl.DEFAULT_GROUP);
+			MulticastSocket socket = new MulticastSocket(
+					CacheChannelUDPImpl.DEFAULT_PORT);
+			socket.joinGroup(group);
+			Iterator itr = selector.selectedKeys().iterator();
+			while (itr.hasNext()) {
+				SelectionKey key = (SelectionKey) itr.next();
+				SourceChannel channel = (SourceChannel) key.channel();
+				if (channel != pipeSource) {
+					continue;
+				}
+				channel.read(buffer);
+				buffer.flip();
+				byte data[] = buffer.array();
+				DatagramPacket hi = new DatagramPacket(data, buffer.limit(),
+						group, CacheChannelUDPImpl.DEFAULT_PORT);
+				socket.send(hi);
+				itr.remove();
+			}
+		}
+	}
+}

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/ClusterSenderRunnable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/ClusterSenderRunnable.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/comm/udpimpl/ClusterSenderRunnable.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/impl/LocalCacheImpl.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/impl/LocalCacheImpl.java?rev=418140&r1=418139&r2=418140&view=diff
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/impl/LocalCacheImpl.java (original)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/impl/LocalCacheImpl.java Thu Jun 29 14:20:49 2006
@@ -38,7 +38,7 @@
  * 
  * $Rev$ $Date$
  */
-public class LocalCacheImpl extends AbstractCache implements LocalCache {
+public class LocalCacheImpl implements LocalCache {
 	public final String DEFAULT_CACHE_NAME = "DefaultCacheName";
 
 	private CacheStore store = null;
@@ -211,13 +211,15 @@
 	}
 
 	public synchronized void putAll(Map t) {
-		Set keySet = t.keySet();
-		checkLocks(keySet);
-		// TODO: this is a lazy way to do things
-		Iterator itr = keySet.iterator();
-		while (itr.hasNext()) {
-			Object key = itr.next();
-			put(key, t.get(key));
+		if (t != null) {
+			Set keySet = t.keySet();
+			checkLocks(keySet);
+			// TODO: this is a lazy way to do things
+			Iterator itr = keySet.iterator();
+			while (itr.hasNext()) {
+				Object key = itr.next();
+				put(key, t.get(key));
+			}
 		}
 	}
 

Modified: geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/impl/ReplicatedCacheImpl.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/impl/ReplicatedCacheImpl.java?rev=418140&r1=418139&r2=418140&view=diff
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/impl/ReplicatedCacheImpl.java (original)
+++ geronimo/sandbox/geronimo-cache/src/main/java/org/apache/geronimo/cache/impl/ReplicatedCacheImpl.java Thu Jun 29 14:20:49 2006
@@ -373,4 +373,8 @@
 		}
 	}
 
+    
+    public String toString() {
+        return getName();
+    }
 }

Modified: geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/MockCacheChannel.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/MockCacheChannel.java?rev=418140&r1=418139&r2=418140&view=diff
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/MockCacheChannel.java (original)
+++ geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/MockCacheChannel.java Thu Jun 29 14:20:49 2006
@@ -35,7 +35,7 @@
 		return null;
 	}
 
-	public void recieve(RemoteEvent update) {
+	public void receive(RemoteEvent update) {
 		receiveCount++;
 	}
 

Modified: geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/RunCacheStuff.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/RunCacheStuff.java?rev=418140&r1=418139&r2=418140&view=diff
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/RunCacheStuff.java (original)
+++ geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/RunCacheStuff.java Thu Jun 29 14:20:49 2006
@@ -26,8 +26,8 @@
 import java.util.Iterator;
 
 import org.apache.geronimo.cache.ReplicatedCache;
-import org.apache.geronimo.cache.comm.impl.CacheCommListener;
-import org.apache.geronimo.cache.comm.impl.CacheChannelUDPImpl;
+import org.apache.geronimo.cache.comm.CacheCommListener;
+import org.apache.geronimo.cache.comm.udpimpl.CacheChannelUDPImpl;
 import org.apache.geronimo.cache.impl.ReplicatedCacheImpl;
 
 /**

Added: geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/amqimpl/AMQImplChannelTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/amqimpl/AMQImplChannelTest.java?rev=418140&view=auto
==============================================================================
--- geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/amqimpl/AMQImplChannelTest.java (added)
+++ geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/amqimpl/AMQImplChannelTest.java Thu Jun 29 14:20:49 2006
@@ -0,0 +1,84 @@
+package org.apache.geronimo.cache.comm.amqimpl;
+
+import javax.jms.JMSException;
+import junit.framework.TestCase;
+import org.apache.geronimo.cache.Cache;
+import org.apache.geronimo.cache.ReplicatedCache;
+import org.apache.geronimo.cache.comm.CacheCommListener;
+import org.apache.geronimo.cache.impl.ReplicatedCacheImpl;
+import edu.emory.mathcs.backport.java.util.concurrent.Callable;
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+public class AMQImplChannelTest extends TestCase {
+    private static int CACHE_COUNT = 5;
+    private final int SAMPLES = 10;
+    private final String KEY_ROOT = "key:";
+    private final String VALUE_ROOT = "value:";
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(AMQImplChannelTest.class);
+    }
+
+    public AMQImplChannelTest(String name) {
+        super(name);
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    public void testMultipleCaches() throws Exception {
+        final CountDownLatch cachesEstablished = new CountDownLatch(CACHE_COUNT);
+        final CountDownLatch done = new CountDownLatch(CACHE_COUNT);
+        final AtomicInteger cacheNumber = new AtomicInteger(0);
+        final Callable cacheCallable = new Callable() {
+            public Object call() throws JMSException, InterruptedException {
+                CacheChannelAMQImpl channel = new CacheChannelAMQImpl();
+                int counter = cacheNumber.getAndIncrement();
+                ReplicatedCache cache = new ReplicatedCacheImpl(channel, "Cache:" + counter);
+                CacheCommListener listener = new CacheCommListener(channel);
+                cache.addListener(listener);
+                // this thread releases its hold on the semaphore
+                cachesEstablished.countDown();
+                // wait for everyone to finish starting up
+                cachesEstablished.await();
+                for (int i = 0; i < SAMPLES; i++) {
+                    cache.put((KEY_ROOT + counter) + ":" + i, VALUE_ROOT + i);
+                }
+                done.countDown();
+                return cache;
+            }
+        };
+        final Cache caches[] = new Cache[CACHE_COUNT];
+        for (int i = 0; i < CACHE_COUNT; i++) {
+            final int num = i;
+            new Thread("CacheThread:" + i) {
+                public void run() {
+                    try {
+                        caches[num] = (Cache) cacheCallable.call();
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                    }
+                }
+            }.start();
+        }
+        done.await();
+        // wait 5 seconds for all the caches to catch up with the pending messages
+        Thread.sleep(5000);
+        // assert each cache is equal to the other caches
+        for(int i = 0;i < CACHE_COUNT;i++) {
+            for(int j = 0;j < CACHE_COUNT;j++) {
+                if(j == i) {
+                    continue;
+                }
+                assertEquals(caches[i].entrySet(), caches[j].entrySet());
+            }
+        }
+    }
+
+}

Propchange: geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/amqimpl/AMQImplChannelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/amqimpl/AMQImplChannelTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/geronimo-cache/src/test/java/org/apache/geronimo/cache/comm/amqimpl/AMQImplChannelTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message