Return-Path: Delivered-To: apmail-geronimo-scm-archive@www.apache.org Received: (qmail 74467 invoked from network); 9 Oct 2006 23:51:28 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 9 Oct 2006 23:51:28 -0000 Received: (qmail 30797 invoked by uid 500); 9 Oct 2006 23:51:28 -0000 Delivered-To: apmail-geronimo-scm-archive@geronimo.apache.org Received: (qmail 30659 invoked by uid 500); 9 Oct 2006 23:51:27 -0000 Mailing-List: contact scm-help@geronimo.apache.org; run by ezmlm Precedence: bulk list-help: list-unsubscribe: List-Post: Reply-To: dev@geronimo.apache.org List-Id: Delivered-To: mailing list scm@geronimo.apache.org Received: (qmail 30648 invoked by uid 99); 9 Oct 2006 23:51:27 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Oct 2006 16:51:27 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Oct 2006 16:51:25 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id ED4CB1A981A; Mon, 9 Oct 2006 16:51:04 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r454556 - in /geronimo/sandbox/gcache/server/src: main/java/org/apache/geronimo/gcache/ main/java/org/apache/geronimo/gcache/transports/ main/java/org/apache/geronimo/gcache/transports/tcp/ test/java/org/apache/geronimo/gcache/transports/tcp/ Date: Mon, 09 Oct 2006 23:51:04 -0000 To: scm@geronimo.apache.org From: jgenender@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061009235104.ED4CB1A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: jgenender Date: Mon Oct 9 16:51:03 2006 New Revision: 454556 URL: http://svn.apache.org/viewvc?view=rev&rev=454556 Log: Add the Endpoint processing Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java (with props) geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java (with props) geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java (with props) geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java (with props) geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java (with props) Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java?view=diff&rev=454556&r1=454555&r2=454556 ============================================================================== --- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java (original) +++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java Mon Oct 9 16:51:03 2006 @@ -18,17 +18,25 @@ */ package org.apache.geronimo.gcache; -import net.sf.ehcache.CacheManager; import net.sf.ehcache.Cache; +import net.sf.ehcache.CacheManager; + import org.apache.geronimo.gcache.server.listeners.CacheNotifier; import org.apache.geronimo.gcache.server.listeners.DefaultCacheNotifier; +import org.apache.geronimo.gcache.transports.EndpointManager; public class CacheInfoHolder { private final CacheManager cacheManager; private CacheNotifier cacheNotifier = null; + private EndpointManager endpointManager; public CacheInfoHolder(CacheManager cacheManager) { this.cacheManager = cacheManager; + this.endpointManager = new EndpointManager(); + } + + public EndpointManager getEndpointManager() { + return endpointManager; } public CacheManager getCacheManager() { Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java?view=auto&rev=454556 ============================================================================== --- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java (added) +++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java Mon Oct 9 16:51:03 2006 @@ -0,0 +1,22 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.gcache.transports; + +public interface Endpoint { + +} Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java?view=auto&rev=454556 ============================================================================== --- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java (added) +++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java Mon Oct 9 16:51:03 2006 @@ -0,0 +1,48 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.gcache.transports; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class EndpointManager { + + Set endpoints = Collections.synchronizedSet(new HashSet()); + + public void addEndpoint(Endpoint endpoint){ + endpoints.add(endpoint); + } + + public void removeEndpoint(Endpoint endpoint){ + endpoints.remove(endpoint); + } + + public boolean contains(Endpoint endpoint){ + return endpoints.contains(endpoint); + } + + public int size(){ + return endpoints.size(); + } + + public Set getEndpoints() { + return endpoints; + } + +} Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java?view=diff&rev=454556&r1=454555&r2=454556 ============================================================================== --- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java (original) +++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java Mon Oct 9 16:51:03 2006 @@ -94,8 +94,9 @@ bcr.reset(channel, commandBuffer); count = bcr.readBuffer(length); if (count < 0) { + //Remove the endpoint from the list of clients + infoHolder.getEndpointManager().removeEndpoint(new TCPEndpoint(channel)); channel.close(); - //TODO - remove the peer as the socket was closed return; } if (count < length) { Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java?view=diff&rev=454556&r1=454555&r2=454556 ============================================================================== --- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java (original) +++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java Mon Oct 9 16:51:03 2006 @@ -18,11 +18,22 @@ */ package org.apache.geronimo.gcache.transports.tcp; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import net.sf.ehcache.Cache; import net.sf.ehcache.Element; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.geronimo.gcache.CacheInfoHolder; +import org.apache.geronimo.gcache.command.BaseCommand; import org.apache.geronimo.gcache.command.BulkSendCommand; import org.apache.geronimo.gcache.command.ClearCacheCommand; import org.apache.geronimo.gcache.command.GetCacheCommand; @@ -34,149 +45,195 @@ import org.apache.geronimo.gcache.transports.CommandVisitor; import org.apache.geronimo.gcache.util.BufferChannelWriter; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - public class TCPCommandVisitor implements CommandVisitor { Log log = LogFactory.getLog(TCPCommandVisitor.class); private CacheInfoHolder infoHolder; + private SelectionKey key; public TCPCommandVisitor(CacheInfoHolder infoHolder, SelectionKey key) { - this.key = key; - this.infoHolder = infoHolder; + this.key = key; + this.infoHolder = infoHolder; } public void processRemoveSession(RemoveSessionCommand command) { - Cache cache = infoHolder.getCache(command.getCacheName(), true); + Cache cache = infoHolder.getCache(command.getCacheName(), true); - //Be sure a session was sent - if (command.hasSession()) { - cache.remove(command.getSessionId()); - } + // Be sure a session was sent + if (command.hasSession()) { + cache.remove(command.getSessionId()); + } - //Notify peers - infoHolder.getCacheNotifier().notifyRemoveSession(command); + // Notify peers + infoHolder.getCacheNotifier().notifyRemoveSession(command); } + @SuppressWarnings("unchecked") public void processRemoveEntry(RemoveEntryCommand command) { + Cache cache = infoHolder.getCache(command.getCacheName(), true); - Cache cache = infoHolder.getCache(command.getCacheName(), true); - - //Check if we are using sessions - if (command.hasSession()) { + // Check if we are using sessions + if (command.hasSession()) { - Map sessionMap = null; + Map sessionMap = null; - //We are so use the session maps that is stored - Element element = cache.get(command.getSessionId()); - if (element != null) { - sessionMap = (Map) element.getObjectValue(); - } else { - sessionMap = Collections.synchronizedMap(new HashMap()); - } + // We are so use the session maps that is stored + Element element = cache.get(command.getSessionId()); + if (element != null) { + sessionMap = (Map) element.getObjectValue(); + } else { + sessionMap = Collections.synchronizedMap(new HashMap()); + } - sessionMap.remove(command.getHashableKey()); + sessionMap.remove(command.getHashableKey()); - //Put the session away - cache.put(new Element(command.getSessionId(), sessionMap)); + // Put the session away + cache.put(new Element(command.getSessionId(), sessionMap)); - } else { + } else { - //No session map so store the value - cache.remove(command.getHashableKey()); - } + // No session map so store the value + cache.remove(command.getHashableKey()); + } - //Notify peers - infoHolder.getCacheNotifier().notifyRemove(command); + // Notify peers + infoHolder.getCacheNotifier().notifyRemove(command); } public void processPutSession(PutSessionCommand command) { - Cache cache = infoHolder.getCache(command.getCacheName(), true); + Cache cache = infoHolder.getCache(command.getCacheName(), true); - //Place the raw session in the cache - try { - cache.put(new Element(command.getSessionId(), command.getRawSessionFromPayload())); - - //Ack the message - MessageAckCommand ack = new MessageAckCommand(); - ack.setMessageId(command.getCommandId()); - byte [] packet = ack.createPacket(true); - int written = sendPacket(packet); - if (written == -1) { - //TODO - This means the socket is dead and need peer removal - } - - } catch (IOException e) { - log.info(e); - } + // Place the raw session in the cache + try { + cache.put(new Element(command.getSessionId(), command + .getRawSessionFromPayload())); + + // Ack the message + MessageAckCommand ack = new MessageAckCommand(); + ack.setMessageId(command.getCommandId()); + byte[] packet = ack.createPacket(true); + sendPacket(packet); + + } catch (IOException e) { + // TODO - What should we do on an IOException, ignore it or + // remove the client? + log.error(e); + } - //Notify peers - infoHolder.getCacheNotifier().notifyPutSession(command); + // Notify peers + infoHolder.getCacheNotifier().notifyPutSession(command); } + @SuppressWarnings({ "unchecked" }) public void processPutEntry(PutEntryCommand command) { - Cache cache = infoHolder.getCache(command.getCacheName(), true); - - //Check if we are using sessions - if (command.hasSession()) { - - Map sessionMap = null; - - //We are so use the session maps that is stored - Element element = cache.get(command.getSessionId()); - if (element != null) { - sessionMap = (Map) element.getObjectValue(); - } else { - sessionMap = Collections.synchronizedMap(new HashMap()); - } + Cache cache = infoHolder.getCache(command.getCacheName(), true); - sessionMap.put(command.getHashableKey(), command.getRawPayload()); - //Put the session away - cache.put(new Element(command.getSessionId(), sessionMap)); + // Check if we are using sessions + if (command.hasSession()) { - } else { + Map sessionMap = null; - //No session map so store the value - cache.put(new Element(command.getHashableKey(), command.getRawPayload())); - } + // We are so use the session maps that is stored + Element element = cache.get(command.getSessionId()); + if (element != null) { + sessionMap = (Map) element.getObjectValue(); + } else { + sessionMap = Collections.synchronizedMap(new HashMap()); + } + + sessionMap.put(command.getHashableKey(), command.getRawPayload()); + // Put the session away + cache.put(new Element(command.getSessionId(), sessionMap)); + + } else { + + // No session map so store the value + cache.put(new Element(command.getHashableKey(), command + .getRawPayload())); + } - //Notify peers - infoHolder.getCacheNotifier().notifyPut(command); + // Notify peers + infoHolder.getCacheNotifier().notifyPut(command); } public void processMessageAck(MessageAckCommand command) { } + @SuppressWarnings("unchecked") public void processGetCache(GetCacheCommand command) { + Cache cache = infoHolder.getCache(command.getCacheName(), true); + + //Add the client endpoint + infoHolder.getEndpointManager().addEndpoint(new TCPEndpoint(key)); + + //Send a bulk command + BulkSendCommand bulk = new BulkSendCommand(); + bulk.setNumberOfCommands(cache.getSize()); + try { + if (sendPacket(bulk.createPacket(true)) < 0) { + return; + } + + for (Object key : (List) cache.getKeys()) { + Element element = cache.get(key); + Object payload = element.getValue(); + + BaseCommand newCommand = null; + // Test if we are sending a session or not + if (payload instanceof HashMap) { + PutSessionCommand psc = new PutSessionCommand(); + psc.setCacheName(command.getCacheName()); + psc.setSessionId((String) key); + psc.setPayloadFromSession((Map) payload); + newCommand = psc; + } else { + PutEntryCommand pec = new PutEntryCommand(); + pec.setCacheName(command.getCacheName()); + pec.setRawPayload((byte[]) payload); + pec.setRawKey((byte [])key); + newCommand = pec; + } + + //Send the packet. If there is a failure just abort + if (sendPacket(newCommand.createPacket(false)) < 0) + return; + } + + + } catch (IOException e) { + // TODO - What should we do on an IOException, ignore it or + // remove the client? + log.error(e); + } } public void processClearCache(ClearCacheCommand command) { - Cache cache = infoHolder.getCache(command.getCacheName(), true); - cache.removeAll(); + Cache cache = infoHolder.getCache(command.getCacheName(), true); + cache.removeAll(); - //Notify peers - infoHolder.getCacheNotifier().notifyClearCache(command); + // Notify peers + infoHolder.getCacheNotifier().notifyClearCache(command); } public void processBulkSend(BulkSendCommand command) { } - private int sendPacket(byte []packet) throws IOException { - //This line if for tests...key should not be null - if (key == null) - return 0; - SocketChannel channel = (SocketChannel) key.channel(); - BufferChannelWriter bcw = new BufferChannelWriter(channel, ByteBuffer.wrap(packet)); - int written = bcw.writeBuffer(packet.length); - return written; + private int sendPacket(byte[] packet) throws IOException { + // This line if for tests...key should not be null + if (key == null) + return 0; + SocketChannel channel = (SocketChannel) key.channel(); + BufferChannelWriter bcw = new BufferChannelWriter(channel, ByteBuffer + .wrap(packet)); + int written = bcw.writeBuffer(packet.length); + if (written == -1) { + // Remove the endpoint from the list of clients + infoHolder.getEndpointManager().removeEndpoint( + new TCPEndpoint(channel)); + } + return written; } } Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java?view=auto&rev=454556 ============================================================================== --- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java (added) +++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java Mon Oct 9 16:51:03 2006 @@ -0,0 +1,53 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.gcache.transports.tcp; + +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +import org.apache.geronimo.gcache.transports.Endpoint; + +public class TCPEndpoint extends Object implements Endpoint{ + + private SocketChannel channel; + + public TCPEndpoint(SocketChannel channel) { + this.channel = channel; + } + + //Convenience constructor to extract channel from the key + public TCPEndpoint(SelectionKey key) { + channel = (SocketChannel) key.channel(); + } + + public SocketChannel getChannel() { + return channel; + } + + @Override + public boolean equals(Object obj) { + return channel.socket().getInetAddress().equals(obj); + } + + @Override + public int hashCode() { + return channel.socket().getInetAddress().hashCode(); + } + + +} Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java?view=auto&rev=454556 ============================================================================== --- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java (added) +++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java Mon Oct 9 16:51:03 2006 @@ -0,0 +1,139 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.gcache.transports.tcp; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SocketChannel; +import java.util.Arrays; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +import net.sf.ehcache.CacheManager; + +import org.apache.geronimo.gcache.CacheInfoHolder; +import org.apache.geronimo.gcache.command.BaseCommand; +import org.apache.geronimo.gcache.command.Command; +import org.apache.geronimo.gcache.command.CommandTypes; +import org.apache.geronimo.gcache.server.impl.DefaultThreadPoolImpl; +import org.apache.geronimo.gcache.server.spi.ThreadPool; +import org.apache.geronimo.gcache.transports.TransportServer; +import org.apache.geronimo.gcache.util.BufferChannelReader; +import org.apache.geronimo.gcache.util.BufferChannelWriter; +import org.apache.geronimo.gcache.util.ByteArrayInputStream; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeSuite; + +public abstract class AbstractServer { + protected static final int port = 45678; + protected static final String host = "localhost"; + + protected TransportServer server = null; + protected ThreadPool pool; + protected SocketChannel clientChannel; + protected CacheInfoHolder info; + + @BeforeClass + public void setUp() throws Exception{ + pool = new DefaultThreadPoolImpl(10); + CacheManager mgr = CacheManager.create(); + info = new CacheInfoHolder(mgr); + DefaultSelectionKeyProcessorFactory factory = new DefaultSelectionKeyProcessorFactory(info); + + server = new TCPSocketTransportServer(host, port, pool, 2000, factory); + + server.start(); + + //Create a client socket + clientChannel = SocketChannel.open(); + clientChannel.connect(new InetSocketAddress(host, port)); + } + + @AfterClass(alwaysRun=true) + public void shutdown() throws Exception{ + server.stop(); + pool.shutdown(); + + clientChannel.close(); + } + + protected void sendCommand(BaseCommand command) throws IOException{ + + byte bytes[] = command.createPacket(true); + BufferChannelWriter bcw = new BufferChannelWriter(clientChannel, ByteBuffer.wrap(bytes)); + int written = bcw.writeBuffer(bytes.length); + assert written == bytes.length; + + } + + protected Command readCommand() throws IOException{ + + ByteBuffer receiveHeader = ByteBuffer.allocate(Constants.MAGIC.length + 13); + BufferChannelReader bcr = new BufferChannelReader(clientChannel, receiveHeader); + int read = bcr.readBuffer(receiveHeader.capacity()); + assert read == Constants.MAGIC.length + 13; + receiveHeader.flip(); + + //Read the magic + byte magic[] = new byte[Constants.MAGIC.length]; + receiveHeader.get(magic); + + //Better match the Magic + assert Arrays.equals(Constants.MAGIC, magic); + + //Get the command + byte commandIdentifier = receiveHeader.get(); + + //Get the checksum + long checksum = receiveHeader.getLong(); + + //Get the command length + int length = receiveHeader.getInt(); + + //pull the command + ByteBuffer commandBuffer = ByteBuffer.allocate(length); + bcr.reset(clientChannel, commandBuffer); + int count = bcr.readBuffer(length); + assert count == length; + + //Calc a checksum + byte commandArray[] = commandBuffer.array(); + Checksum calcChecksum = new CRC32(); + calcChecksum.update(commandArray, 0, commandArray.length); + long newCheck = calcChecksum.getValue(); + + //Checksums match + assert newCheck == checksum; + + //Now create the command + ByteArrayInputStream bias = new ByteArrayInputStream(commandBuffer.array()); + ReadableByteChannel readChannel = Channels.newChannel(bias); + + //Create the command and unmarshal the data + Command command = CommandTypes.createCommand(commandIdentifier); + command.readExternal(readChannel); + + return command; + + } +} Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java?view=auto&rev=454556 ============================================================================== --- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java (added) +++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java Mon Oct 9 16:51:03 2006 @@ -0,0 +1,67 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.gcache.transports.tcp; + +import java.nio.channels.SocketChannel; +import java.util.Set; + +import org.apache.geronimo.gcache.command.BulkSendCommand; +import org.apache.geronimo.gcache.command.Command; +import org.apache.geronimo.gcache.command.GetCacheCommand; +import org.apache.geronimo.gcache.transports.Endpoint; +import org.testng.annotations.Test; + +public class TCPEndpointTest extends AbstractServer { + + @Test + public void testJoinCluster() throws Exception{ + + assert 0 == info.getEndpointManager().size(); + + GetCacheCommand command = new GetCacheCommand(); + + command.setCacheName("Cache1"); + + //Send the packet + sendCommand(command); + + //Now receive any data (it Should be a BulkSendCommand) + Command bulk = this.readCommand(); + + //Is the message the type we think it is? + assert bulk instanceof BulkSendCommand; + + int commandsToFollow = ((BulkSendCommand)bulk).getNumberOfCommands(); + + //Nothing is in the Cache, so no commands should follow + assert commandsToFollow == 0; + + //Should have one client + assert 1 == info.getEndpointManager().size(); + + Set set = info.getEndpointManager().getEndpoints(); + TCPEndpoint endpoint = (TCPEndpoint)set.iterator().next(); + SocketChannel endpointChannel = endpoint.getChannel(); + + //Check that the socket addresses match (Remote on server == Local for client) + assert endpointChannel.socket().getRemoteSocketAddress().equals(clientChannel.socket().getLocalSocketAddress()); + + + } + +} Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java?view=diff&rev=454556&r1=454555&r2=454556 ============================================================================== --- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java (original) +++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java Mon Oct 9 16:51:03 2006 @@ -16,50 +16,22 @@ */ package org.apache.geronimo.gcache.transports.tcp; -import net.sf.ehcache.CacheManager; -import org.apache.geronimo.gcache.CacheInfoHolder; -import org.apache.geronimo.gcache.util.BufferChannelReader; -import org.apache.geronimo.gcache.command.PutSessionCommand; -import org.apache.geronimo.gcache.command.BaseCommand; -import org.apache.geronimo.gcache.command.Command; -import org.apache.geronimo.gcache.command.CommandTypes; -import org.apache.geronimo.gcache.command.MessageAckCommand; -import org.apache.geronimo.gcache.server.impl.DefaultThreadPoolImpl; -import org.apache.geronimo.gcache.server.spi.ThreadPool; -import org.apache.geronimo.gcache.transports.TransportServer; -import org.testng.annotations.AfterSuite; -import org.testng.annotations.BeforeSuite; -import org.testng.annotations.Test; - -import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.Channels; import java.util.HashMap; import java.util.Map; -import java.util.Arrays; -import java.util.zip.Checksum; -import java.util.zip.CRC32; -import java.io.ByteArrayInputStream; - -public class TcpSocketServerTest { - private static final int port = 45678; - private static final String host = "localhost"; +import org.apache.geronimo.gcache.command.Command; +import org.apache.geronimo.gcache.command.MessageAckCommand; +import org.apache.geronimo.gcache.command.PutSessionCommand; +import org.testng.annotations.Test; - TransportServer server = null; - ThreadPool pool; +public class TcpSocketServerTest extends AbstractServer { @Test() - public void sendSession() throws Exception { - - //Create a client socket - SocketChannel channel = SocketChannel.open(); - channel.connect(new InetSocketAddress(host, port)); + public void sendSession() throws Exception { //Create a session - Map session = new HashMap(); + Map session = new HashMap(); session.put("key1","data1"); session.put("key2","data2"); session.put("key3","data3"); @@ -70,53 +42,10 @@ //Send the packet ByteBuffer commandBuffer = ByteBuffer.wrap(command.createPacket(true)); - channel.write(commandBuffer); + clientChannel.write(commandBuffer); //Now receive any data (it Should be a MessageAck) - ByteBuffer receiveHeader = ByteBuffer.allocate(Constants.MAGIC.length + 13); - BufferChannelReader bcr = new BufferChannelReader(channel, receiveHeader); - int read = bcr.readBuffer(receiveHeader.capacity()); - assert read == Constants.MAGIC.length + 13; - receiveHeader.flip(); - - //Read the magic - byte magic[] = new byte[Constants.MAGIC.length]; - receiveHeader.get(magic); - - //Better match the Magic - assert Arrays.equals(Constants.MAGIC, magic); - - //Get the command - byte commandIdentifier = receiveHeader.get(); - - //Get the checksum - long checksum = receiveHeader.getLong(); - - //Get the command length - int length = receiveHeader.getInt(); - - //pull the command - commandBuffer = ByteBuffer.allocate(length); - bcr.reset(channel, commandBuffer); - int count = bcr.readBuffer(length); - assert count == length; - - //Calc a checksum - byte commandArray[] = commandBuffer.array(); - Checksum calcChecksum = new CRC32(); - calcChecksum.update(commandArray, 0, commandArray.length); - long newCheck = calcChecksum.getValue(); - - //Checksums match - assert newCheck == checksum; - - //Now create the command - ByteArrayInputStream bias = new ByteArrayInputStream(commandBuffer.array()); - ReadableByteChannel readChannel = Channels.newChannel(bias); - - //Create the command and unmarshal the data - Command ackCommand = CommandTypes.createCommand(commandIdentifier); - ackCommand.readExternal(readChannel); + Command ackCommand = this.readCommand(); //Is the message the type we think it is? assert ackCommand instanceof MessageAckCommand; @@ -125,24 +54,5 @@ assert command.getCommandId() == ((MessageAckCommand)ackCommand).getMessageId(); } - - - @BeforeSuite - public void setUp() throws Exception{ - pool = new DefaultThreadPoolImpl(10); - CacheManager mgr = CacheManager.create(); - CacheInfoHolder info = new CacheInfoHolder(mgr); - DefaultSelectionKeyProcessorFactory factory = new DefaultSelectionKeyProcessorFactory(info); - - server = new TCPSocketTransportServer(host, port, pool, 2000, factory); - - server.start(); - } - - @AfterSuite(alwaysRun=true) - public void shutdown() throws Exception{ - server.stop(); - pool.shutdown(); - } }