Return-Path: X-Original-To: apmail-avro-commits-archive@www.apache.org Delivered-To: apmail-avro-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E19114BF4 for ; Fri, 17 Jun 2011 17:55:45 +0000 (UTC) Received: (qmail 93697 invoked by uid 500); 17 Jun 2011 17:55:45 -0000 Delivered-To: apmail-avro-commits-archive@avro.apache.org Received: (qmail 93669 invoked by uid 500); 17 Jun 2011 17:55:45 -0000 Mailing-List: contact commits-help@avro.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@avro.apache.org Delivered-To: mailing list commits@avro.apache.org Received: (qmail 93662 invoked by uid 99); 17 Jun 2011 17:55:45 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Jun 2011 17:55:45 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Jun 2011 17:55:42 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 844572388ABA; Fri, 17 Jun 2011 17:55:22 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1136961 - in /avro/branches/branch-1.5: ./ lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/ lang/java/ipc/src/main/java/org/apache/avro/ipc/ lang/java/ipc/src/main/java/org/apache/avro/ipc/gene... Date: Fri, 17 Jun 2011 17:55:22 -0000 To: commits@avro.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110617175522.844572388ABA@eris.apache.org> Author: cutting Date: Fri Jun 17 17:55:21 2011 New Revision: 1136961 URL: http://svn.apache.org/viewvc?rev=1136961&view=rev Log: Merge r1099257 and r1136342 from trunk. Fixes: AVRO-815 and AVRO-539. Added: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/CallFuture.java - copied unchanged from r1136342, avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/CallFuture.java avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Callback.java - copied unchanged from r1136342, avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Callback.java avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java - copied unchanged from r1136342, avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java - copied unchanged from r1136342, avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java Modified: avro/branches/branch-1.5/ (props changed) avro/branches/branch-1.5/CHANGES.txt avro/branches/branch-1.5/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java avro/branches/branch-1.5/share/test/schemas/mail.avpr Propchange: avro/branches/branch-1.5/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Jun 17 17:55:21 2011 @@ -1 +1 @@ -/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550,1094812,1095206-1095208,1095493,1095529,1095548,1095550,1096798,1097916,1097927,1097968,1097974,1102332,1102335,1124127,1124971,1129053,1129071,1129697-1129706,1129729,1130503 +/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550,1094812,1095206-1095208,1095493,1095529,1095548,1095550,1096798,1097916,1097927,1097968,1097974,1099257,1102332,1102335,1124127,1124971,1129053,1129071,1129697-1129706,1129729,1130503,1136342 Modified: avro/branches/branch-1.5/CHANGES.txt URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/CHANGES.txt?rev=1136961&r1=1136960&r2=1136961&view=diff ============================================================================== --- avro/branches/branch-1.5/CHANGES.txt (original) +++ avro/branches/branch-1.5/CHANGES.txt Fri Jun 17 17:55:21 2011 @@ -8,6 +8,9 @@ Avro 1.5.2 (unreleased) streaming jobs to easily write Avro format output with "bytes" as schema. (Tom White via cutting) + AVRO-539. Java: Add asynchronous RPC support, through either + callbacks or futures. (James Baldassari via cutting) + IMPROVEMENTS AVRO-469. C: Set library's libtool-style soversion when using CMake @@ -34,6 +37,10 @@ Avro 1.5.2 (unreleased) AVRO-832. Java: Fix RPC client to correctly perform schema resolution on message responses. (cutting) + AVRO-815. Java: Netty Transceiver fails processing one-way messages. + Implemented writeBuffers for the NettyTransceiver to allow it to + process one-way messages. + Avro 1.5.1 (3 May 2011) NEW FEATURES Modified: avro/branches/branch-1.5/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm?rev=1136961&r1=1136960&r2=1136961&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm (original) +++ avro/branches/branch-1.5/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm Fri Jun 17 17:55:21 2011 @@ -21,7 +21,7 @@ package $protocol.getNamespace(); @SuppressWarnings("all") #if ($protocol.getDoc()) - /** $protocol.getDoc() */ +/** $protocol.getDoc() */ #end public interface $this.mangle($protocol.getName()) { public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("${this.javaEscape($protocol.toString())}"); @@ -46,4 +46,31 @@ public interface $this.mangle($protocol. #end## (one way) ; #end## (requests) -} + +## Generate nested callback API + @SuppressWarnings("all") +#if ($protocol.getDoc()) + /** $protocol.getDoc() */ +#end + public interface Callback extends $this.mangle($protocol.getName()) { + public static final org.apache.avro.Protocol PROTOCOL = #if ($protocol.getNamespace())$protocol.getNamespace().#end${this.mangle($protocol.getName())}.PROTOCOL; +#foreach ($e in $protocol.getMessages().entrySet()) +#set ($name = $e.getKey()) +#set ($message = $e.getValue()) +#set ($response = $message.getResponse()) +## Generate callback method if the message is not one-way: +#if (! $message.isOneWay()) +#if ($message.getDoc()) + /** $this.escapeForJavadoc($message.getDoc()) */ +#end + void ${this.mangle($name)}(## +#foreach ($p in $message.getRequest().getFields())## +#* *#${this.javaUnbox($p.schema())} ${this.mangle($p.name())}#if ($velocityHasNext), #end +#end +#if ($message.getRequest().getFields().size() > 0), #end +org.apache.avro.ipc.Callback<${this.javaType($response)}> callback) throws java.io.IOException; +#end## (generate callback method) +#end## (requests) + }## End of Callback interface + +}## End of protocol interface Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java?rev=1136961&r1=1136960&r2=1136961&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java (original) +++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java Fri Jun 17 17:55:21 2011 @@ -54,13 +54,13 @@ public class NettyServer implements Serv private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class .getName()); - private Responder responder; + private final Responder responder; - private Channel serverChannel; - private ChannelGroup allChannels = new DefaultChannelGroup( + private final Channel serverChannel; + private final ChannelGroup allChannels = new DefaultChannelGroup( "avro-netty-server"); - private ChannelFactory channelFactory; - private CountDownLatch closed = new CountDownLatch(1); + private final ChannelFactory channelFactory; + private final CountDownLatch closed = new CountDownLatch(1); public NettyServer(Responder responder, InetSocketAddress addr) { this.responder = responder; @@ -133,14 +133,13 @@ public class NettyServer implements Serv NettyDataPack dataPack = (NettyDataPack) e.getMessage(); List req = dataPack.getDatas(); List res = responder.respond(req, connectionMetadata); - dataPack.setDatas(res); - e.getChannel().write(dataPack); + // response will be null for oneway messages. + if(res != null) { + dataPack.setDatas(res); + e.getChannel().write(dataPack); + } } catch (IOException ex) { LOG.warn("unexpect error"); - } finally { - if(!connectionMetadata.isConnected()){ - e.getChannel().close(); - } } } Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1136961&r1=1136960&r2=1136961&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original) +++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Fri Jun 17 17:55:21 2011 @@ -27,10 +27,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.atomic.AtomicInteger; import org.apache.avro.Protocol; @@ -45,6 +43,7 @@ import org.jboss.netty.channel.ChannelFu import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; @@ -61,26 +60,39 @@ public class NettyTransceiver extends Tr private static final Logger LOG = LoggerFactory.getLogger(NettyTransceiver.class .getName()); - private ChannelFactory channelFactory; - private Channel channel; + private final AtomicInteger serialGenerator = new AtomicInteger(0); + private final Map>> requests = + new ConcurrentHashMap>>(); - private AtomicInteger serialGenerator = new AtomicInteger(0); - private Map requests = - new ConcurrentHashMap(); + private final ChannelFactory channelFactory; + private final ClientBootstrap bootstrap; + private final InetSocketAddress remoteAddr; - private Protocol remote; + /** + * Read lock must be acquired whenever using non-final state. + * Write lock must be acquired whenever modifying state. + */ + private final ReadWriteLock stateLock = new ReentrantReadWriteLock(); + private boolean open = false; // Synchronized on stateLock + private Channel channel; // Synchronized on stateLock + private Protocol remote; // Synchronized on stateLock + + NettyTransceiver() { + channelFactory = null; + bootstrap = null; + remoteAddr = null; + } - NettyTransceiver() {} - public NettyTransceiver(InetSocketAddress addr) { - this(addr, new NioClientSocketChannelFactory(Executors. - newCachedThreadPool(), Executors.newCachedThreadPool())); + this(addr, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), + Executors.newCachedThreadPool())); } public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) { // Set up. this.channelFactory = channelFactory; - ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); + bootstrap = new ClientBootstrap(channelFactory); + remoteAddr = addr; // Configure the event pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @@ -97,54 +109,160 @@ public class NettyTransceiver extends Tr bootstrap.setOption("tcpNoDelay", true); // Make a new connection. - ChannelFuture channelFuture = bootstrap.connect(addr); - channelFuture.awaitUninterruptibly(); - if (!channelFuture.isSuccess()) { - channelFuture.getCause().printStackTrace(); - throw new RuntimeException(channelFuture.getCause()); + connect(); + } + + /** + * Connects to the remote peer if not already connected. + */ + private void connect() { + stateLock.writeLock().lock(); + try { + if (!open || (channel == null) || !channel.isOpen() || !channel.isBound() || !channel.isConnected()) { + LOG.info("Connecting to " + remoteAddr); + ChannelFuture channelFuture = bootstrap.connect(remoteAddr); + channelFuture.awaitUninterruptibly(); + if (!channelFuture.isSuccess()) { + channelFuture.getCause().printStackTrace(); + throw new RuntimeException(channelFuture.getCause()); + } + channel = channelFuture.getChannel(); + open = true; + } + } finally { + stateLock.writeLock().unlock(); + } + } + + /** + * Closes the connection to the remote peer if connected. + */ + private void disconnect() { + disconnect(false); + } + + /** + * Closes the connection to the remote peer if connected. + * @param awaitCompletion if true, will block until the close has completed. + */ + private void disconnect(boolean awaitCompletion) { + stateLock.writeLock().lock(); + try { + if (channel != null) { + LOG.info("Disconnecting from " + remoteAddr); + ChannelFuture closeFuture = channel.close(); + if (awaitCompletion) { + closeFuture.awaitUninterruptibly(); + } + channel = null; + remote = null; + open = false; + } + } finally { + stateLock.writeLock().unlock(); } - channel = channelFuture.getChannel(); + } + + /** + * Netty channels are thread-safe, so there is no need to acquire locks. + * This method is a no-op. + */ + @Override + public void lockChannel() { + + } + + /** + * Netty channels are thread-safe, so there is no need to acquire locks. + * This method is a no-op. + */ + @Override + public void unlockChannel() { + } public void close() { - // Close the connection. - channel.close().awaitUninterruptibly(); - // Shut down all thread pools to exit. - channelFactory.releaseExternalResources(); + stateLock.writeLock().lock(); + try { + // Close the connection. + disconnect(true); + // Shut down all thread pools to exit. + channelFactory.releaseExternalResources(); + } finally { + stateLock.writeLock().unlock(); + } } @Override public String getRemoteName() { - return channel.getRemoteAddress().toString(); + stateLock.readLock().lock(); + try { + return channel.getRemoteAddress().toString(); + } finally { + stateLock.readLock().unlock(); + } } /** * Override as non-synchronized method because the method is thread safe. */ @Override - public List transceive(List request) - throws IOException { - int serial = serialGenerator.incrementAndGet(); - NettyDataPack dataPack = new NettyDataPack(serial, request); - CallFuture callFuture = new CallFuture(); - requests.put(serial, callFuture); - channel.write(dataPack); + public List transceive(List request) { try { - return callFuture.get(); + CallFuture> transceiverFuture = new CallFuture>(); + transceive(request, transceiverFuture); + return transceiverFuture.get(); } catch (InterruptedException e) { LOG.warn("failed to get the response", e); return null; } catch (ExecutionException e) { LOG.warn("failed to get the response", e); return null; + } + } + + @Override + public void transceive(List request, Callback> callback) { + stateLock.readLock().lock(); + try { + int serial = serialGenerator.incrementAndGet(); + NettyDataPack dataPack = new NettyDataPack(serial, request); + requests.put(serial, callback); + writeDataPack(dataPack); } finally { - requests.remove(serial); + stateLock.readLock().unlock(); } } - + @Override public void writeBuffers(List buffers) throws IOException { - throw new UnsupportedOperationException(); + writeDataPack(new NettyDataPack(serialGenerator.incrementAndGet(), buffers)); + } + + /** + * Writes a NettyDataPack, reconnecting to the remote peer if necessary. + * @param dataPack the data pack to write. + */ + private void writeDataPack(NettyDataPack dataPack) { + stateLock.readLock().lock(); + try { + while ((channel == null) || !channel.isOpen() || !channel.isBound() || !channel.isConnected()) { + // Need to reconnect + // Upgrade to write lock + stateLock.readLock().unlock(); + stateLock.writeLock().lock(); + try { + connect(); + } finally { + // Downgrade to read lock: + stateLock.readLock().lock(); + stateLock.writeLock().unlock(); + } + } + channel.write(dataPack); + } finally { + stateLock.readLock().unlock(); + } } @Override @@ -154,71 +272,32 @@ public class NettyTransceiver extends Tr @Override public Protocol getRemote() { - return remote; + stateLock.readLock().lock(); + try { + return remote; + } finally { + stateLock.readLock().unlock(); + } } @Override public boolean isConnected() { - return remote!=null; + stateLock.readLock().lock(); + try { + return remote!=null; + } finally { + stateLock.readLock().unlock(); + } } @Override public void setRemote(Protocol protocol) { - this.remote = protocol; - } - - /** - * Future class for a RPC call - */ - class CallFuture implements Future>{ - private Semaphore sem = new Semaphore(0); - private List response = null; - - public void setResponse(List response) { - this.response = response; - sem.release(); - } - - public void releaseSemphore() { - sem.release(); - } - - public List getResponse() { - return response; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public List get() throws InterruptedException, - ExecutionException { - sem.acquire(); - return response; - } - - @Override - public List get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - if (sem.tryAcquire(timeout, unit)) { - return response; - } else { - throw new TimeoutException(); - } - } - - @Override - public boolean isDone() { - return sem.availablePermits()>0; + stateLock.writeLock().lock(); + try { + this.remote = protocol; + } finally { + stateLock.writeLock().unlock(); } - } /** @@ -231,6 +310,33 @@ public class NettyTransceiver extends Tr throws Exception { if (e instanceof ChannelStateEvent) { LOG.info(e.toString()); + ChannelStateEvent cse = (ChannelStateEvent)e; + if ((cse.getState() == ChannelState.OPEN) && (Boolean.FALSE.equals(cse.getValue()))) { + // Server closed connection; disconnect client side + LOG.info("Remote peer " + remoteAddr + " closed connection."); + stateLock.readLock().lock(); + boolean readLockAcquired = true; + try { + // Only disconnect if open to prevent deadlock on close() + if (open) { + // Upgrade to write lock: + stateLock.readLock().unlock(); + readLockAcquired = false; + stateLock.writeLock().lock(); + try { + if (open) { + disconnect(); + } + } finally { + stateLock.writeLock().unlock(); + } + } + } finally { + if (readLockAcquired) { + stateLock.readLock().unlock(); + } + } + } } super.handleUpstream(ctx, e); } @@ -245,11 +351,15 @@ public class NettyTransceiver extends Tr @Override public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) { NettyDataPack dataPack = (NettyDataPack)e.getMessage(); - CallFuture callFuture = requests.get(dataPack.getSerial()); - if (callFuture==null) { + Callback> callback = requests.get(dataPack.getSerial()); + if (callback==null) { throw new RuntimeException("Missing previous call info"); } - callFuture.setResponse(dataPack.getDatas()); + try { + callback.handleResult(dataPack.getDatas()); + } finally { + requests.remove(dataPack.getSerial()); + } } @Override @@ -257,9 +367,9 @@ public class NettyTransceiver extends Tr LOG.warn("Unexpected exception from downstream.", e.getCause()); e.getChannel().close(); // let the blocking waiting exit - Iterator it = requests.values().iterator(); + Iterator>> it = requests.values().iterator(); while(it.hasNext()) { - it.next().releaseSemphore(); + it.next().handleError(e.getCause()); it.remove(); } Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java?rev=1136961&r1=1136960&r2=1136961&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java (original) +++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java Fri Jun 17 17:55:21 2011 @@ -20,9 +20,11 @@ package org.apache.avro.ipc; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.ReentrantLock; import java.util.List; import java.util.Map; @@ -57,12 +59,13 @@ public abstract class Requestor { private static final GenericDatumWriter> META_WRITER = new GenericDatumWriter>(META); - private Protocol local; - private Protocol remote; - private boolean sendLocalText; - private Transceiver transceiver; + private final Protocol local; + private volatile Protocol remote; + private volatile boolean sendLocalText; + private final Transceiver transceiver; + private final ReentrantLock handshakeLock = new ReentrantLock(); - protected List rpcMetaPlugins; + protected final List rpcMetaPlugins; public Protocol getLocal() { return local; } public Transceiver getTransceiver() { return transceiver; } @@ -72,7 +75,7 @@ public abstract class Requestor { this.local = local; this.transceiver = transceiver; this.rpcMetaPlugins = - Collections.synchronizedList(new ArrayList()); + new CopyOnWriteArrayList(); } /** @@ -85,93 +88,96 @@ public abstract class Requestor { } private static final EncoderFactory ENCODER_FACTORY = new EncoderFactory(); - private BinaryEncoder encoder = - ENCODER_FACTORY.binaryEncoder(new ByteBufferOutputStream(), null); /** Writes a request message and reads a response or error message. */ - public synchronized Object request(String messageName, Object request) + public Object request(String messageName, Object request) throws Exception { - Transceiver t = getTransceiver(); - BinaryDecoder in = null; - Message m; - RPCContext context = new RPCContext(); - do { - ByteBufferOutputStream bbo = new ByteBufferOutputStream(); - //safe to use encoder because this is synchronized - BinaryEncoder out = ENCODER_FACTORY.binaryEncoder(bbo, encoder); - - // use local protocol to write request - m = getLocal().getMessages().get(messageName); - if (m == null) - throw new AvroRuntimeException("Not a local message: "+messageName); - context.setMessage(m); + // Initialize request + Request rpcRequest = new Request(messageName, request, new RPCContext()); + CallFuture future = /* only need a Future for two-way messages */ + rpcRequest.getMessage().isOneWay() ? null : new CallFuture(); - writeRequest(m.getRequest(), request, out); // write request payload - - out.flush(); - List payload = bbo.getBufferList(); - - writeHandshake(out); // prepend handshake if needed - - context.setRequestPayload(payload); - for (RPCPlugin plugin : rpcMetaPlugins) { - plugin.clientSendRequest(context); // get meta-data from plugins + // Send request + request(rpcRequest, future); + + if (future == null) // the message is one-way, so return immediately + return null; + try { // the message is two-way, wait for the result + return future.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof Exception) { + throw (Exception)e.getCause(); + } else { + throw new RuntimeException(e.getCause()); } - META_WRITER.write(context.requestCallMeta(), out); - - out.writeString(m.getName()); // write message name - - out.flush(); - bbo.append(payload); - - List requestBytes = bbo.getBufferList(); - - if (m.isOneWay() && t.isConnected()) { // send one-way message - t.writeBuffers(requestBytes); - - return null; - } else { // two-way message - List response = t.transceive(requestBytes); - ByteBufferInputStream bbi = new ByteBufferInputStream(response); - in = DecoderFactory.get().binaryDecoder(bbi, in); + } + } + + /** + * Writes a request message and returns the result through a Callback. + * Clients can also use a Future interface by creating a new CallFuture, + * passing it in as the Callback parameter, and then waiting on that Future. + * @param the return type of the message. + * @param messageName the name of the message to invoke. + * @param request the request data to send. + * @param callback the callback which will be invoked when the response is returned + * or an error occurs. + * @throws Exception if an error occurs sending the message. + */ + public void request(String messageName, Object request, Callback callback) + throws Exception { + request(new Request(messageName, request, new RPCContext()), callback); + } + + /** Writes a request message and returns the result through a Callback. */ + void request(Request request, Callback callback) + throws Exception { + Transceiver t = getTransceiver(); + if (!t.isConnected()) { + // Acquire handshake lock so that only one thread is performing the + // handshake and other threads block until the handshake is completed + handshakeLock.lock(); + try { + if (t.isConnected()) { + // Another thread already completed the handshake; no need to hold + // the write lock + handshakeLock.unlock(); + } else { + CallFuture callFuture = new CallFuture(callback); + t.transceive(request.getBytes(), + new TransceiverCallback(request, callFuture)); + // Block until handshake complete + callFuture.await(); + return; + } + } finally{ + if (handshakeLock.isHeldByCurrentThread()) { + handshakeLock.unlock(); + } } - } while (!readHandshake(in)); - - // use remote protocol to read response - Message rm = remote.getMessages().get(messageName); - if (rm == null) - throw new AvroRuntimeException("Not a remote message: "+messageName); - - if ((m.isOneWay() != rm.isOneWay()) && t.isConnected()) - throw new AvroRuntimeException("Not both one-way messages: "+messageName); - - if (m.isOneWay() && t.isConnected()) return null; // one-way w/ handshake - - context.setResponseCallMeta(META_READER.read(null, in)); - - if (!in.readBoolean()) { // no error - Object response = readResponse(rm.getResponse(), m.getResponse(), in); - context.setResponse(response); - for (RPCPlugin plugin : rpcMetaPlugins) { - plugin.clientReceiveResponse(context); + } + + if (request.getMessage().isOneWay()) { + t.lockChannel(); + try { + t.writeBuffers(request.getBytes()); + if (callback != null) { + callback.handleResult(null); + } + } finally { + t.unlockChannel(); } - return response; - } else { - Exception error = readError(rm.getErrors(), m.getErrors(), in); - context.setError(error); - for (RPCPlugin plugin : rpcMetaPlugins) { - plugin.clientReceiveResponse(context); - } - throw error; + t.transceive(request.getBytes(), + new TransceiverCallback(request, callback)); } } - private static final Map REMOTE_HASHES = - Collections.synchronizedMap(new HashMap()); - private static final Map REMOTE_PROTOCOLS = - Collections.synchronizedMap(new HashMap()); + private static final ConcurrentMap REMOTE_HASHES = + new ConcurrentHashMap(); + private static final ConcurrentMap REMOTE_PROTOCOLS = + new ConcurrentHashMap(); private static final SpecificDatumWriter HANDSHAKE_WRITER = new SpecificDatumWriter(HandshakeRequest.class); @@ -185,10 +191,11 @@ public abstract class Requestor { localHash.bytes(local.getMD5()); String remoteName = transceiver.getRemoteName(); MD5 remoteHash = REMOTE_HASHES.get(remoteName); - remote = REMOTE_PROTOCOLS.get(remoteHash); if (remoteHash == null) { // guess remote is local remoteHash = localHash; remote = local; + } else { + remote = REMOTE_PROTOCOLS.get(remoteHash); } HandshakeRequest handshake = new HandshakeRequest(); handshake.clientHash = localHash; @@ -244,30 +251,36 @@ public abstract class Requestor { remote = Protocol.parse(handshake.serverProtocol.toString()); MD5 remoteHash = (MD5)handshake.serverHash; REMOTE_HASHES.put(transceiver.getRemoteName(), remoteHash); - if (!REMOTE_PROTOCOLS.containsKey(remoteHash)) - REMOTE_PROTOCOLS.put(remoteHash, remote); + REMOTE_PROTOCOLS.putIfAbsent(remoteHash, remote); } /** Return the remote protocol. Force a handshake if required. */ - public synchronized Protocol getRemote() throws IOException { + public Protocol getRemote() throws IOException { if (remote != null) return remote; // already have it MD5 remoteHash = REMOTE_HASHES.get(transceiver.getRemoteName()); - remote = REMOTE_PROTOCOLS.get(remoteHash); - if (remote != null) return remote; // already cached - // force handshake - ByteBufferOutputStream bbo = new ByteBufferOutputStream(); - // direct because the payload is tiny. - Encoder out = ENCODER_FACTORY.directBinaryEncoder(bbo, null); - writeHandshake(out); - out.writeInt(0); // empty metadata - out.writeString(""); // bogus message name - List response = - getTransceiver().transceive(bbo.getBufferList()); - ByteBufferInputStream bbi = new ByteBufferInputStream(response); - BinaryDecoder in = - DecoderFactory.get().binaryDecoder(bbi, null); - readHandshake(in); - return this.remote; + if (remoteHash != null) { + remote = REMOTE_PROTOCOLS.get(remoteHash); + if (remote != null) return remote; // already cached + } + handshakeLock.lock(); + try { + // force handshake + ByteBufferOutputStream bbo = new ByteBufferOutputStream(); + // direct because the payload is tiny. + Encoder out = ENCODER_FACTORY.directBinaryEncoder(bbo, null); + writeHandshake(out); + out.writeInt(0); // empty metadata + out.writeString(""); // bogus message name + List response = + getTransceiver().transceive(bbo.getBufferList()); + ByteBufferInputStream bbi = new ByteBufferInputStream(response); + BinaryDecoder in = + DecoderFactory.get().binaryDecoder(bbi, null); + readHandshake(in); + return this.remote; + } finally { + handshakeLock.unlock(); + } } @@ -292,5 +305,249 @@ public abstract class Requestor { /** Reads an error message. */ public abstract Exception readError(Schema writer, Schema reader, Decoder in) throws IOException; -} + + /** + * Handles callbacks from transceiver invocations. + */ + protected class TransceiverCallback implements Callback> { + private final Request request; + private final Callback callback; + + /** + * Creates a TransceiverCallback. + * @param request the request to set. + * @param callback the callback to set. + */ + public TransceiverCallback(Request request, Callback callback) { + this.request = request; + this.callback = callback; + } + + @Override + @SuppressWarnings("unchecked") + public void handleResult(List responseBytes) { + ByteBufferInputStream bbi = new ByteBufferInputStream(responseBytes); + BinaryDecoder in = DecoderFactory.get().binaryDecoder(bbi, null); + try { + if (!readHandshake(in)) { + // Resend the handshake and return + Request handshake = new Request(request); + getTransceiver().transceive + (handshake.getBytes(), + new TransceiverCallback(handshake, callback)); + return; + } + } catch (Exception e) { + LOG.error("Error handling transceiver callback: " + e, e); + } + + // Read response; invoke callback + Response response = new Response(request, in); + Object responseObject; + try { + try { + responseObject = response.getResponse(); + } catch (Exception e) { + if (callback != null) { + callback.handleError(e); + } + return; + } + if (callback != null) { + callback.handleResult((T)responseObject); + } + } catch (Throwable t) { + LOG.error("Error in callback handler: " + t, t); + } + } + + @Override + public void handleError(Throwable error) { + callback.handleError(error); + } + } + + /** + * Encapsulates/generates a request. + */ + class Request { + private final String messageName; + private final Object request; + private final RPCContext context; + private final BinaryEncoder encoder; + private Message message; + private List requestBytes; + + /** + * Creates a Request. + * @param messageName the name of the message to invoke. + * @param request the request data to send. + * @param context the RPC context to use. + */ + public Request(String messageName, Object request, RPCContext context) { + this(messageName, request, context, null); + } + + /** + * Creates a Request. + * @param messageName the name of the message to invoke. + * @param request the request data to send. + * @param context the RPC context to use. + * @param encoder the BinaryEncoder to use to serialize the request. + */ + public Request(String messageName, Object request, RPCContext context, + BinaryEncoder encoder) { + this.messageName = messageName; + this.request = request; + this.context = context; + this.encoder = + ENCODER_FACTORY.binaryEncoder(new ByteBufferOutputStream(), encoder); + } + + /** + * Copy constructor. + * @param other Request from which to copy fields. + */ + public Request(Request other) { + this.messageName = other.messageName; + this.request = other.request; + this.context = other.context; + this.encoder = other.encoder; + } + + /** + * Gets the message name. + * @return the message name. + */ + public String getMessageName() { + return messageName; + } + + /** + * Gets the RPC context. + * @return the RPC context. + */ + public RPCContext getContext() { + return context; + } + + /** + * Gets the Message associated with this request. + * @return this request's message. + */ + public Message getMessage() { + if (message == null) { + message = getLocal().getMessages().get(messageName); + if (message == null) { + throw new AvroRuntimeException("Not a local message: "+messageName); + } + } + return message; + } + + /** + * Gets the request data, generating it first if necessary. + * @return the request data. + * @throws Exception if an error occurs generating the request data. + */ + public List getBytes() + throws Exception { + if (requestBytes == null) { + ByteBufferOutputStream bbo = new ByteBufferOutputStream(); + BinaryEncoder out = ENCODER_FACTORY.binaryEncoder(bbo, encoder); + + // use local protocol to write request + Message m = getMessage(); + context.setMessage(m); + + writeRequest(m.getRequest(), request, out); // write request payload + + out.flush(); + List payload = bbo.getBufferList(); + + writeHandshake(out); // prepend handshake if needed + + context.setRequestPayload(payload); + for (RPCPlugin plugin : rpcMetaPlugins) { + plugin.clientSendRequest(context); // get meta-data from plugins + } + META_WRITER.write(context.requestCallMeta(), out); + out.writeString(m.getName()); // write message name + + out.flush(); + bbo.append(payload); + + requestBytes = bbo.getBufferList(); + } + return requestBytes; + } + } + + /** + * Encapsulates/parses a response. + */ + class Response { + private final Request request; + private final BinaryDecoder in; + + /** + * Creates a Response. + * @param request the Request associated with this response. + */ + public Response(Request request) { + this(request, null); + } + + /** + * Creates a Creates a Response. + * @param request the Request associated with this response. + * @param in the BinaryDecoder to use to deserialize the response. + */ + public Response(Request request, BinaryDecoder in) { + this.request = request; + this.in = in; + } + + /** + * Gets the RPC response, reading/deserializing it first if necessary. + * @return the RPC response. + * @throws Exception if an error occurs reading/deserializing the response. + */ + public Object getResponse() + throws Exception { + Message lm = request.getMessage(); + Message rm = remote.getMessages().get(request.getMessageName()); + if (rm == null) + throw new AvroRuntimeException + ("Not a remote message: "+request.getMessageName()); + + Transceiver t = getTransceiver(); + if ((lm.isOneWay() != rm.isOneWay()) && t.isConnected()) + throw new AvroRuntimeException + ("Not both one-way messages: "+request.getMessageName()); + + if (lm.isOneWay() && t.isConnected()) return null; // one-way w/ handshake + + RPCContext context = request.getContext(); + context.setResponseCallMeta(META_READER.read(null, in)); + + if (!in.readBoolean()) { // no error + Object response = readResponse(rm.getResponse(), lm.getResponse(), in); + context.setResponse(response); + for (RPCPlugin plugin : rpcMetaPlugins) { + plugin.clientReceiveResponse(context); + } + return response; + + } else { + Exception error = readError(rm.getErrors(), lm.getErrors(), in); + context.setError(error); + for (RPCPlugin plugin : rpcMetaPlugins) { + plugin.clientReceiveResponse(context); + } + throw error; + } + } + } +} Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java?rev=1136961&r1=1136960&r2=1136961&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java (original) +++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java Fri Jun 17 17:55:21 2011 @@ -21,9 +21,8 @@ package org.apache.avro.ipc; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.List; import java.util.Map; @@ -62,12 +61,12 @@ public abstract class Responder { private static final ThreadLocal REMOTE = new ThreadLocal(); - private Map protocols - = Collections.synchronizedMap(new HashMap()); + private final Map protocols + = new ConcurrentHashMap(); - private Protocol local; - private MD5 localHash; - protected List rpcMetaPlugins; + private final Protocol local; + private final MD5 localHash; + protected final List rpcMetaPlugins; protected Responder(Protocol local) { this.local = local; @@ -75,7 +74,7 @@ public abstract class Responder { localHash.bytes(local.getMD5()); protocols.put(localHash, local); this.rpcMetaPlugins = - Collections.synchronizedList(new ArrayList()); + new CopyOnWriteArrayList(); } /** Return the remote protocol. Accesses a {@link ThreadLocal} that's set Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java?rev=1136961&r1=1136960&r2=1136961&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java (original) +++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java Fri Jun 17 17:55:21 2011 @@ -22,21 +22,58 @@ import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.concurrent.locks.ReentrantLock; import org.apache.avro.Protocol; /** Base transport class used by {@link Requestor}. */ public abstract class Transceiver implements Closeable { + private final ReentrantLock channelLock = new ReentrantLock(); public abstract String getRemoteName(); + + /** + * Acquires an exclusive lock on the transceiver's channel. + */ + public void lockChannel() { + channelLock.lock(); + } + + /** + * Releases the lock on the transceiver's channel if held by the calling thread. + */ + public void unlockChannel() { + if (channelLock.isHeldByCurrentThread()) { + channelLock.unlock(); + } + } /** Called by {@link Requestor#request(String,Object)} for two-way messages. * By default calls {@link #writeBuffers(List)} followed by * {@link #readBuffers()}. */ - public synchronized List transceive(List request) + public List transceive(List request) + throws IOException { + lockChannel(); + try { + writeBuffers(request); + return readBuffers(); + } finally { + unlockChannel(); + } + } + + /** + * Called by {@link Requestor#request(String,Object,Callback)} for two-way messages using callbacks. + */ + public void transceive(List request, Callback> callback) throws IOException { - writeBuffers(request); - return readBuffers(); + // The default implementation works synchronously + try { + List response = transceive(request); + callback.handleResult(response); + } catch (IOException e) { + callback.handleError(e); + } } /** Called by the default definition of {@link #transceive(List)}.*/ Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java?rev=1136961&r1=1136960&r2=1136961&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java (original) +++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java Fri Jun 17 17:55:21 2011 @@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericDa import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.Decoder; import org.apache.avro.io.Encoder; +import org.apache.avro.ipc.Callback; import org.apache.avro.ipc.Requestor; import org.apache.avro.ipc.Transceiver; @@ -53,6 +54,20 @@ public class GenericRequestor extends Re } @Override + public void request(String messageName, Object request, Callback callback) + throws IOException { + try { + super.request(messageName, request, callback); + } catch (Exception e) { + if (e instanceof RuntimeException) + throw (RuntimeException)e; + if (e instanceof IOException) + throw (IOException)e; + throw new AvroRemoteException(e); + } + } + + @Override public void writeRequest(Schema schema, Object request, Encoder out) throws IOException { new GenericDatumWriter(schema).write(request, out); Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java?rev=1136961&r1=1136960&r2=1136961&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java (original) +++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java Fri Jun 17 17:55:21 2011 @@ -22,6 +22,8 @@ import java.io.IOException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Type; +import java.util.Arrays; import org.apache.avro.Protocol; import org.apache.avro.Schema; @@ -32,6 +34,7 @@ import org.apache.avro.io.Decoder; import org.apache.avro.io.Encoder; import org.apache.avro.ipc.Transceiver; import org.apache.avro.ipc.Requestor; +import org.apache.avro.ipc.Callback; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; @@ -52,7 +55,20 @@ public class SpecificRequestor extends R @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - return request(method.getName(), args); + // Check if this is a callback-based RPC: + Type[] parameterTypes = method.getParameterTypes(); + if ((parameterTypes.length > 0) && + (parameterTypes[parameterTypes.length - 1] instanceof Class) && + Callback.class.isAssignableFrom(((Class)parameterTypes[parameterTypes.length - 1]))) { + // Extract the Callback from the end of of the argument list + Object[] finalArgs = Arrays.copyOf(args, args.length - 1); + Callback callback = (Callback)args[args.length - 1]; + request(method.getName(), finalArgs, callback); + return null; + } + else { + return request(method.getName(), args); + } } protected DatumWriter getDatumWriter(Schema schema) { Modified: avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java?rev=1136961&r1=1136960&r2=1136961&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java (original) +++ avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java Fri Jun 17 17:55:21 2011 @@ -18,7 +18,11 @@ package org.apache.avro.ipc; +import static org.junit.Assert.assertEquals; + import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import junit.framework.Assert; @@ -27,52 +31,109 @@ import org.apache.avro.ipc.specific.Spec import org.apache.avro.test.Mail; import org.apache.avro.test.Message; import org.apache.avro.util.Utf8; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; public class TestNettyServer { + private static Server server; + private static Transceiver transceiver; + private static Mail proxy; + private static MailImpl mailService; + public static class MailImpl implements Mail { + + private CountDownLatch allMessages = new CountDownLatch(5); + // in this simple example just return details of the message public CharSequence send(Message message) { return new Utf8("Sent message to [" + message.to.toString() + "] from [" + message.from.toString() + "] with body [" + message.body.toString() + "]"); } - } + + public void fireandforget(Message message) { + allMessages.countDown(); + } + + private void awaitMessages() throws InterruptedException { + allMessages.await(2, TimeUnit.SECONDS); + } + + private void assertAllMessagesReceived() { + assertEquals(0, allMessages.getCount()); + } - @Test - public void test() throws Exception { + public void reset() { + allMessages = new CountDownLatch(5); + } + } + + @BeforeClass + public static void initializeConnections()throws Exception { // start server System.out.println("starting server..."); - Responder responder = new SpecificResponder(Mail.class, new MailImpl()); - Server server = new NettyServer(responder, new InetSocketAddress(0)); + mailService = new MailImpl(); + Responder responder = new SpecificResponder(Mail.class, mailService); + server = new NettyServer(responder, new InetSocketAddress(0)); server.start(); - + int serverPort = server.getPort(); System.out.println("server port : " + serverPort); - // client - Transceiver transceiver = new NettyTransceiver(new InetSocketAddress( + transceiver = new NettyTransceiver(new InetSocketAddress( serverPort)); - Mail proxy = SpecificRequestor.getClient(Mail.class, transceiver); + proxy = SpecificRequestor.getClient(Mail.class, transceiver); + } + + @AfterClass + public static void tearDownConnections() throws Exception{ + transceiver.close(); + server.close(); + } + @Test + public void testRequestResponse() throws Exception { + for(int x = 0; x < 5; x++) { + verifyResponse(proxy.send(createMessage())); + } + } + + private void verifyResponse(CharSequence result) { + Assert.assertEquals( + "Sent message to [wife] from [husband] with body [I love you!]", + result.toString()); + } + + @Test + public void testOneway() throws Exception { + for (int x = 0; x < 5; x++) { + proxy.fireandforget(createMessage()); + } + mailService.awaitMessages(); + mailService.assertAllMessagesReceived(); + } + + @Test + public void testMixtureOfRequests() throws Exception { + mailService.reset(); + for (int x = 0; x < 5; x++) { + Message createMessage = createMessage(); + proxy.fireandforget(createMessage); + verifyResponse(proxy.send(createMessage)); + } + mailService.awaitMessages(); + mailService.assertAllMessagesReceived(); + + } + + private Message createMessage() { Message msg = new Message(); msg.to = new Utf8("wife"); msg.from = new Utf8("husband"); msg.body = new Utf8("I love you!"); - - try { - for(int x = 0; x < 5; x++) { - CharSequence result = proxy.send(msg); - System.out.println("Result: " + result); - Assert.assertEquals( - "Sent message to [wife] from [husband] with body [I love you!]", - result.toString()); - } - } finally { - transceiver.close(); - server.close(); - } + return msg; } } Modified: avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java?rev=1136961&r1=1136960&r2=1136961&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java (original) +++ avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java Fri Jun 17 17:55:21 2011 @@ -94,5 +94,7 @@ public class TestRpcPluginOrdering { public CharSequence send(Message message) throws AvroRemoteException { return new Utf8("Received"); } + public void fireandforget(Message message) { + } } } Modified: avro/branches/branch-1.5/share/test/schemas/mail.avpr URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/share/test/schemas/mail.avpr?rev=1136961&r1=1136960&r2=1136961&view=diff ============================================================================== --- avro/branches/branch-1.5/share/test/schemas/mail.avpr (original) +++ avro/branches/branch-1.5/share/test/schemas/mail.avpr Fri Jun 17 17:55:21 2011 @@ -15,6 +15,12 @@ "send": { "request": [{"name": "message", "type": "Message"}], "response": "string" + }, + "fireandforget": { + "request": [{"name": "message", "type": "Message"}], + "response": "null", + "one-way": true } + } -} \ No newline at end of file +}