avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1136342 - in /avro/trunk: ./ lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/ lang/java/ipc/src/main/java/org/apache/avro/ipc/ lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/ lang/java...
Date Thu, 16 Jun 2011 09:33:29 GMT
Author: cutting
Date: Thu Jun 16 09:33:29 2011
New Revision: 1136342

URL: http://svn.apache.org/viewvc?rev=1136342&view=rev
Log:
AVRO-539. Java: Add asynchronous RPC support, through either callbacks or futures.  Contributed by James Baldassari.

Added:
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/CallFuture.java
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Callback.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Jun 16 09:33:29 2011
@@ -6,6 +6,9 @@ Avro 1.6.0 (unreleased)
 
   NEW FEATURES
 
+    AVRO-539. Java: Add asynchronous RPC support, through either
+    callbacks or futures.  (James Baldassari via cutting)
+
   IMPROVEMENTS
 
     AVRO-833. Don't require simplejson for python >= 2.6.

Modified: avro/trunk/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm (original)
+++ avro/trunk/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm Thu Jun 16 09:33:29 2011
@@ -21,7 +21,7 @@ package $protocol.getNamespace();
 
 @SuppressWarnings("all")
 #if ($protocol.getDoc())
-  /** $protocol.getDoc() */
+/** $protocol.getDoc() */
 #end
 public interface $this.mangle($protocol.getName()) {
   public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("${this.javaEscape($protocol.toString())}");
@@ -46,4 +46,31 @@ public interface $this.mangle($protocol.
 #end##    (one way)
 ;
 #end## (requests)
-}
+
+## Generate nested callback API
+  @SuppressWarnings("all")
+#if ($protocol.getDoc())
+  /** $protocol.getDoc() */
+#end
+  public interface Callback extends $this.mangle($protocol.getName()) {
+    public static final org.apache.avro.Protocol PROTOCOL = #if ($protocol.getNamespace())$protocol.getNamespace().#end${this.mangle($protocol.getName())}.PROTOCOL;
+#foreach ($e in $protocol.getMessages().entrySet())
+#set ($name = $e.getKey())
+#set ($message = $e.getValue())
+#set ($response = $message.getResponse())
+## Generate callback method if the message is not one-way:
+#if (! $message.isOneWay())
+#if ($message.getDoc())
+    /** $this.escapeForJavadoc($message.getDoc()) */
+#end
+    void ${this.mangle($name)}(##
+#foreach ($p in $message.getRequest().getFields())##
+#*      *#${this.javaUnbox($p.schema())} ${this.mangle($p.name())}#if ($velocityHasNext), #end
+#end
+#if ($message.getRequest().getFields().size() > 0), #end
+org.apache.avro.ipc.Callback<${this.javaType($response)}> callback) throws java.io.IOException;
+#end## (generate callback method)
+#end## (requests)
+  }## End of Callback interface
+
+}## End of protocol interface

Added: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/CallFuture.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/CallFuture.java?rev=1136342&view=auto
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/CallFuture.java (added)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/CallFuture.java Thu Jun 16 09:33:29 2011
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A Future implementation for RPCs.
+ */
+public class CallFuture<T> implements Future<T>, Callback<T> {
+  private final CountDownLatch latch = new CountDownLatch(1);
+  private final Callback<T> chainedCallback;
+  private T result = null;
+  private Throwable error = null;
+  
+  /**
+   * Creates a CallFuture.
+   */
+  public CallFuture() {
+    this(null);
+  }
+  
+  /**
+   * Creates a CallFuture with a chained Callback which will be invoked
+   * when this CallFuture's Callback methods are invoked.
+   * @param chainedCallback the chained Callback to set.
+   */
+  public CallFuture(Callback<T> chainedCallback) {
+    this.chainedCallback = chainedCallback;
+  }
+  
+  /**
+   * Sets the RPC response, and unblocks all threads waiting on {@link #get()} 
+   * or {@link #get(long, TimeUnit)}.
+   * @param result the RPC result to set.
+   */
+  @Override
+  public void handleResult(T result) {
+    this.result = result;
+    latch.countDown();
+    if (chainedCallback != null) {
+      chainedCallback.handleResult(result);
+    }
+  }
+  
+  /**
+   * Sets an error thrown during RPC execution, and unblocks all threads waiting 
+   * on {@link #get()} or {@link #get(long, TimeUnit)}.
+   * @param error the RPC error to set.
+   */
+  @Override
+  public void handleError(Throwable error) {
+    this.error = error;
+    latch.countDown();
+    if (chainedCallback != null) {
+      chainedCallback.handleError(error);
+    }
+  }
+
+  /**
+   * Gets the value of the RPC result without blocking.
+   * Using {@link #get()} or {@link #get(long, TimeUnit)} is usually 
+   * preferred because these methods block until the result is available or 
+   * an error occurs. 
+   * @return the value of the response, or null if no result was returned or 
+   * the RPC has not yet completed.
+   */
+  public T getResult() {
+    return result;
+  }
+  
+  /**
+   * Gets the error that was thrown during RPC execution.  Does not block.
+   * Either {@link #get()} or {@link #get(long, TimeUnit)} should be called 
+   * first because these methods block until the RPC has completed.
+   * @return the RPC error that was thrown, or null if no error has occurred or 
+   * if the RPC has not yet completed.
+   */
+  public Throwable getError() {
+    return error;
+  }
+
+  @Override
+  public boolean cancel(boolean mayInterruptIfRunning) {
+    return false;
+  }
+
+  @Override
+  public boolean isCancelled() {
+    return false;
+  }
+
+  @Override
+  public T get() throws InterruptedException,
+      ExecutionException {
+    latch.await();
+    if (error != null) {
+      throw new ExecutionException(error);
+    }
+    return result;
+  }
+
+  @Override
+  public T get(long timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    if (latch.await(timeout, unit)) {
+      if (error != null) {
+        throw new ExecutionException(error);
+      }
+      return result;
+    } else {
+      throw new TimeoutException();
+    }
+  }
+  
+  /**
+   * Waits for the CallFuture to complete without returning the result.
+   * @throws InterruptedException if interrupted.
+   */
+  public void await() throws InterruptedException {
+    latch.await();
+  }
+  
+  /**
+   * Waits for the CallFuture to complete without returning the result.
+   * @param timeout the maximum time to wait.
+   * @param unit the time unit of the timeout argument.
+   * @throws InterruptedException if interrupted.
+   * @throws TimeoutException if the wait timed out.
+   */
+  public void await(long timeout, TimeUnit unit) 
+    throws InterruptedException, TimeoutException {
+    if (!latch.await(timeout, unit)) {
+      throw new TimeoutException();
+    }
+  }
+
+  @Override
+  public boolean isDone() {
+    return latch.getCount() <= 0;
+  }
+}

Added: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Callback.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Callback.java?rev=1136342&view=auto
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Callback.java (added)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Callback.java Thu Jun 16 09:33:29 2011
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+/**
+ * Interface for receiving asynchronous callbacks.
+ * For each request with an asynchronous callback, 
+ * either {@link #handleResult(Object)} or {@link #handleError(Exception)} 
+ * will be invoked.
+ */
+public interface Callback<T> {
+  /**
+   * Receives a callback result.
+   * @param result the result returned in the callback.
+   */
+  void handleResult(T result);
+  
+  /**
+   * Receives an error.
+   * @param error the error returned in the callback.
+   */
+  void handleError(Throwable error);
+}

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java Thu Jun 16 09:33:29 2011
@@ -54,13 +54,13 @@ public class NettyServer implements Serv
   private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class
       .getName());
 
-  private Responder responder;
+  private final Responder responder;
 
-  private Channel serverChannel;
-  private ChannelGroup allChannels = new DefaultChannelGroup(
+  private final Channel serverChannel;
+  private final ChannelGroup allChannels = new DefaultChannelGroup(
       "avro-netty-server");
-  private ChannelFactory channelFactory;
-  private CountDownLatch closed = new CountDownLatch(1);
+  private final ChannelFactory channelFactory;
+  private final CountDownLatch closed = new CountDownLatch(1);
   
   public NettyServer(Responder responder, InetSocketAddress addr) {
     this.responder = responder;
@@ -140,10 +140,6 @@ public class NettyServer implements Serv
         }
       } catch (IOException ex) {
         LOG.warn("unexpect error");
-      } finally {
-        if(!connectionMetadata.isConnected()){
-          e.getChannel().close();
-        }
       }
     }
 

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Thu Jun 16 09:33:29 2011
@@ -27,10 +27,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.avro.Protocol;
@@ -45,6 +43,7 @@ import org.jboss.netty.channel.ChannelFu
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelState;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.ExceptionEvent;
@@ -61,26 +60,39 @@ public class NettyTransceiver extends Tr
   private static final Logger LOG = LoggerFactory.getLogger(NettyTransceiver.class
       .getName());
 
-  private ChannelFactory channelFactory;
-  private Channel channel;
+  private final AtomicInteger serialGenerator = new AtomicInteger(0);
+  private final Map<Integer, Callback<List<ByteBuffer>>> requests = 
+    new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>();
   
-  private AtomicInteger serialGenerator = new AtomicInteger(0);
-  private Map<Integer, CallFuture> requests = 
-    new ConcurrentHashMap<Integer, CallFuture>();
+  private final ChannelFactory channelFactory;
+  private final ClientBootstrap bootstrap;
+  private final InetSocketAddress remoteAddr;
   
-  private Protocol remote;
+  /**
+   * Read lock must be acquired whenever using non-final state.
+   * Write lock must be acquired whenever modifying state.
+   */
+  private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+  private boolean open = false;  // Synchronized on stateLock
+  private Channel channel;       // Synchronized on stateLock
+  private Protocol remote;       // Synchronized on stateLock
+
+  NettyTransceiver() {
+    channelFactory = null;
+    bootstrap = null;
+    remoteAddr = null;
+  }
 
-  NettyTransceiver() {}
-  
   public NettyTransceiver(InetSocketAddress addr) {
-    this(addr, new NioClientSocketChannelFactory(Executors.
-        newCachedThreadPool(), Executors.newCachedThreadPool()));
+    this(addr, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), 
+        Executors.newCachedThreadPool()));
   }
 
   public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) {
     // Set up.
     this.channelFactory = channelFactory;
-    ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
+    bootstrap = new ClientBootstrap(channelFactory);
+    remoteAddr = addr;
 
     // Configure the event pipeline factory.
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@@ -97,54 +109,160 @@ public class NettyTransceiver extends Tr
     bootstrap.setOption("tcpNoDelay", true);
 
     // Make a new connection.
-    ChannelFuture channelFuture = bootstrap.connect(addr);
-    channelFuture.awaitUninterruptibly();
-    if (!channelFuture.isSuccess()) {
-      channelFuture.getCause().printStackTrace();
-      throw new RuntimeException(channelFuture.getCause());
+    connect();
+  }
+  
+  /**
+   * Connects to the remote peer if not already connected.
+   */
+  private void connect() {
+    stateLock.writeLock().lock();
+    try {
+      if (!open || (channel == null) || !channel.isOpen() || !channel.isBound() || !channel.isConnected()) {
+        LOG.info("Connecting to " + remoteAddr);
+        ChannelFuture channelFuture = bootstrap.connect(remoteAddr);
+        channelFuture.awaitUninterruptibly();
+        if (!channelFuture.isSuccess()) {
+          channelFuture.getCause().printStackTrace();
+          throw new RuntimeException(channelFuture.getCause());
+        }
+        channel = channelFuture.getChannel();
+        open = true;
+      }
+    } finally {
+      stateLock.writeLock().unlock();
+    }
+  }
+  
+  /**
+   * Closes the connection to the remote peer if connected.
+   */
+  private void disconnect() {
+    disconnect(false);
+  }
+  
+  /**
+   * Closes the connection to the remote peer if connected.
+   * @param awaitCompletion if true, will block until the close has completed.
+   */
+  private void disconnect(boolean awaitCompletion) {
+    stateLock.writeLock().lock();
+    try {
+      if (channel != null) {
+        LOG.info("Disconnecting from " + remoteAddr);
+        ChannelFuture closeFuture = channel.close();
+        if (awaitCompletion) {
+          closeFuture.awaitUninterruptibly();
+        }
+        channel = null;
+        remote = null;
+        open = false;
+      }
+    } finally {
+      stateLock.writeLock().unlock();
     }
-    channel = channelFuture.getChannel();
+  }
+  
+  /**
+   * Netty channels are thread-safe, so there is no need to acquire locks.
+   * This method is a no-op.
+   */
+  @Override
+  public void lockChannel() {
+    
+  }
+  
+  /**
+   * Netty channels are thread-safe, so there is no need to acquire locks.
+   * This method is a no-op.
+   */
+  @Override
+  public void unlockChannel() {
+    
   }
 
   public void close() {
-    // Close the connection.
-    channel.close().awaitUninterruptibly();
-    // Shut down all thread pools to exit.
-    channelFactory.releaseExternalResources();
+    stateLock.writeLock().lock();
+    try {
+      // Close the connection.
+      disconnect(true);
+      // Shut down all thread pools to exit.
+      channelFactory.releaseExternalResources();
+    } finally {
+      stateLock.writeLock().unlock();
+    }
   }
 
   @Override
   public String getRemoteName() {
-    return channel.getRemoteAddress().toString();
+    stateLock.readLock().lock();
+    try {
+      return channel.getRemoteAddress().toString();
+    } finally {
+      stateLock.readLock().unlock();
+    }
   }
 
   /**
    * Override as non-synchronized method because the method is thread safe.
    */
   @Override
-  public List<ByteBuffer> transceive(List<ByteBuffer> request)
-      throws IOException {
-    int serial = serialGenerator.incrementAndGet();
-    NettyDataPack dataPack = new NettyDataPack(serial, request);
-    CallFuture callFuture = new CallFuture();
-    requests.put(serial, callFuture);
-    channel.write(dataPack);
+  public List<ByteBuffer> transceive(List<ByteBuffer> request) {
     try {
-      return callFuture.get();
+      CallFuture<List<ByteBuffer>> transceiverFuture = new CallFuture<List<ByteBuffer>>();
+      transceive(request, transceiverFuture);
+      return transceiverFuture.get();
     } catch (InterruptedException e) {
       LOG.warn("failed to get the response", e);
       return null;
     } catch (ExecutionException e) {
       LOG.warn("failed to get the response", e);
       return null;
+    }
+  }
+  
+  @Override
+  public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) {
+    stateLock.readLock().lock();
+    try {
+      int serial = serialGenerator.incrementAndGet();
+      NettyDataPack dataPack = new NettyDataPack(serial, request);
+      requests.put(serial, callback);
+      writeDataPack(dataPack);
     } finally {
-      requests.remove(serial);
+      stateLock.readLock().unlock();
     }
   }
-
+  
   @Override
   public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
-    channel.write(new NettyDataPack(serialGenerator.incrementAndGet(), buffers));
+    writeDataPack(new NettyDataPack(serialGenerator.incrementAndGet(), buffers));
+  }
+  
+  /**
+   * Writes a NettyDataPack, reconnecting to the remote peer if necessary.
+   * @param dataPack the data pack to write.
+   */
+  private void writeDataPack(NettyDataPack dataPack) {
+    stateLock.readLock().lock();
+    try {
+      while ((channel == null) || !channel.isOpen() || !channel.isBound() || !channel.isConnected()) {
+        // Need to reconnect
+        // Upgrade to write lock
+        stateLock.readLock().unlock();
+        stateLock.writeLock().lock();
+        try {
+          connect();
+        } finally {
+          // Downgrade to read lock:
+          stateLock.readLock().lock();
+          stateLock.writeLock().unlock();
+        }
+      }
+      channel.write(dataPack);
+    } finally {
+      stateLock.readLock().unlock();
+    }
   }
 
   @Override
@@ -154,71 +272,32 @@ public class NettyTransceiver extends Tr
   
   @Override
   public Protocol getRemote() {
-    return remote;
+    stateLock.readLock().lock();
+    try {
+      return remote;
+    } finally {
+      stateLock.readLock().unlock();
+    }
   }
 
   @Override
   public boolean isConnected() {
-    return remote!=null;
+    stateLock.readLock().lock();
+    try {
+      return remote!=null;
+    } finally {
+      stateLock.readLock().unlock();
+    }
   }
 
   @Override
   public void setRemote(Protocol protocol) {
-    this.remote = protocol;
-  }
-
-  /**
-   * Future class for a RPC call
-   */
-  class CallFuture implements Future<List<ByteBuffer>>{
-    private Semaphore sem = new Semaphore(0);
-    private List<ByteBuffer> response = null;
-    
-    public void setResponse(List<ByteBuffer> response) {
-      this.response = response;
-      sem.release();
-    }
-    
-    public void releaseSemphore() {
-      sem.release();
-    }
-
-    public List<ByteBuffer> getResponse() {
-      return response;
-    }
-
-    @Override
-    public boolean cancel(boolean mayInterruptIfRunning) {
-      return false;
-    }
-
-    @Override
-    public boolean isCancelled() {
-      return false;
-    }
-
-    @Override
-    public List<ByteBuffer> get() throws InterruptedException,
-        ExecutionException {
-      sem.acquire();
-      return response;
-    }
-
-    @Override
-    public List<ByteBuffer> get(long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-      if (sem.tryAcquire(timeout, unit)) {
-        return response;
-      } else {
-        throw new TimeoutException();
-      }
-    }
-
-    @Override
-    public boolean isDone() {
-      return sem.availablePermits()>0;
+    stateLock.writeLock().lock();
+    try {
+      this.remote = protocol;
+    } finally {
+      stateLock.writeLock().unlock();
     }
-    
   }
 
   /**
@@ -231,6 +310,33 @@ public class NettyTransceiver extends Tr
         throws Exception {
       if (e instanceof ChannelStateEvent) {
         LOG.info(e.toString());
+        ChannelStateEvent cse = (ChannelStateEvent)e;
+        if ((cse.getState() == ChannelState.OPEN) && (Boolean.FALSE.equals(cse.getValue()))) {
+          // Server closed connection; disconnect client side
+          LOG.info("Remote peer " + remoteAddr + " closed connection.");
+          stateLock.readLock().lock();
+          boolean readLockAcquired = true;
+          try {
+            // Only disconnect if open to prevent deadlock on close()
+            if (open) {
+              // Upgrade to write lock:
+              stateLock.readLock().unlock();
+              readLockAcquired = false;
+              stateLock.writeLock().lock();
+              try {
+                if (open) {
+                  disconnect();
+                }
+              } finally {
+                stateLock.writeLock().unlock();
+              }
+            }
+          } finally {
+            if (readLockAcquired) {
+              stateLock.readLock().unlock();
+            }
+          }
+        }
       }
       super.handleUpstream(ctx, e);
     }
@@ -245,11 +351,15 @@ public class NettyTransceiver extends Tr
     @Override
     public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
       NettyDataPack dataPack = (NettyDataPack)e.getMessage();
-      CallFuture callFuture = requests.get(dataPack.getSerial());
-      if (callFuture==null) {
+      Callback<List<ByteBuffer>> callback = requests.get(dataPack.getSerial());
+      if (callback==null) {
         throw new RuntimeException("Missing previous call info");
       }
-      callFuture.setResponse(dataPack.getDatas());
+      try {
+        callback.handleResult(dataPack.getDatas());
+      } finally {
+        requests.remove(dataPack.getSerial());
+      }
     }
 
     @Override
@@ -257,9 +367,9 @@ public class NettyTransceiver extends Tr
       LOG.warn("Unexpected exception from downstream.", e.getCause());
       e.getChannel().close();
       // let the blocking waiting exit
-      Iterator<CallFuture> it = requests.values().iterator();
+      Iterator<Callback<List<ByteBuffer>>> it = requests.values().iterator();
       while(it.hasNext()) {
-        it.next().releaseSemphore();
+        it.next().handleError(e.getCause());
         it.remove();
       }
       

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java Thu Jun 16 09:33:29 2011
@@ -20,9 +20,11 @@ package org.apache.avro.ipc;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.List;
 import java.util.Map;
 
@@ -57,12 +59,13 @@ public abstract class Requestor {
   private static final GenericDatumWriter<Map<CharSequence,ByteBuffer>>
     META_WRITER = new GenericDatumWriter<Map<CharSequence,ByteBuffer>>(META);
 
-  private Protocol local;
-  private Protocol remote;
-  private boolean sendLocalText;
-  private Transceiver transceiver;
+  private final Protocol local;
+  private volatile Protocol remote;
+  private volatile boolean sendLocalText;
+  private final Transceiver transceiver;
+  private final ReentrantLock handshakeLock = new ReentrantLock();
   
-  protected List<RPCPlugin> rpcMetaPlugins;
+  protected final List<RPCPlugin> rpcMetaPlugins;
 
   public Protocol getLocal() { return local; }
   public Transceiver getTransceiver() { return transceiver; }
@@ -72,7 +75,7 @@ public abstract class Requestor {
     this.local = local;
     this.transceiver = transceiver;
     this.rpcMetaPlugins =
-      Collections.synchronizedList(new ArrayList<RPCPlugin>());
+      new CopyOnWriteArrayList<RPCPlugin>();
   }
   
   /**
@@ -85,93 +88,96 @@ public abstract class Requestor {
   }
 
   private static final EncoderFactory ENCODER_FACTORY = new EncoderFactory();
-  private BinaryEncoder encoder = 
-    ENCODER_FACTORY.binaryEncoder(new ByteBufferOutputStream(), null);
   
   /** Writes a request message and reads a response or error message. */
-  public synchronized Object request(String messageName, Object request)
+  public Object request(String messageName, Object request)
     throws Exception {
-    Transceiver t = getTransceiver();
-    BinaryDecoder in = null;
-    Message m;
-    RPCContext context = new RPCContext();
-    do {
-      ByteBufferOutputStream bbo = new ByteBufferOutputStream();
-      //safe to use encoder because this is synchronized
-      BinaryEncoder out = ENCODER_FACTORY.binaryEncoder(bbo, encoder);
-      
-      // use local protocol to write request
-      m = getLocal().getMessages().get(messageName);
-      if (m == null)
-        throw new AvroRuntimeException("Not a local message: "+messageName);
-      context.setMessage(m);
+    // Initialize request
+    Request rpcRequest = new Request(messageName, request, new RPCContext());
+    CallFuture<Object> future = /* only need a Future for two-way messages */
+      rpcRequest.getMessage().isOneWay() ? null : new CallFuture<Object>();
     
-      writeRequest(m.getRequest(), request, out); // write request payload
-      
-      out.flush();
-      List<ByteBuffer> payload = bbo.getBufferList();
-      
-      writeHandshake(out);                       // prepend handshake if needed
-      
-      context.setRequestPayload(payload);
-      for (RPCPlugin plugin : rpcMetaPlugins) {
-        plugin.clientSendRequest(context);        // get meta-data from plugins
+    // Send request
+    request(rpcRequest, future);
+    
+    if (future == null)        // the message is one-way, so return immediately
+      return null;
+    try {                      // the message is two-way, wait for the result
+      return future.get();
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof Exception) {
+        throw (Exception)e.getCause();
+      } else {
+        throw new RuntimeException(e.getCause());
       }
-      META_WRITER.write(context.requestCallMeta(), out);
-
-      out.writeString(m.getName());               // write message name
-
-      out.flush();
-      bbo.append(payload);
-      
-      List<ByteBuffer> requestBytes = bbo.getBufferList();
-
-      if (m.isOneWay() && t.isConnected()) {      // send one-way message
-        t.writeBuffers(requestBytes);
-        
-        return null;
-      } else {                                    // two-way message
-        List<ByteBuffer> response = t.transceive(requestBytes);
-        ByteBufferInputStream bbi = new ByteBufferInputStream(response);
-        in = DecoderFactory.get().binaryDecoder(bbi, in);
+    }
+  }
+  
+  /**
+   * Writes a request message and returns the result through a Callback.
+   * Clients can also use a Future interface by creating a new CallFuture<T>,
+   * passing it in as the Callback parameter, and then waiting on that Future.
+   * @param <T> the return type of the message.
+   * @param messageName the name of the message to invoke.
+   * @param request the request data to send.
+   * @param callback the callback which will be invoked when the response is returned 
+   * or an error occurs.
+   * @throws Exception if an error occurs sending the message.
+   */
+  public <T> void request(String messageName, Object request, Callback<T> callback) 
+    throws Exception {
+    request(new Request(messageName, request, new RPCContext()), callback);
+  }
+  
+  /** Writes a request message and returns the result through a Callback. */
+  <T> void request(Request request, Callback<T> callback)
+    throws Exception {
+    Transceiver t = getTransceiver();
+    if (!t.isConnected()) {
+      // Acquire handshake lock so that only one thread is performing the
+      // handshake and other threads block until the handshake is completed
+      handshakeLock.lock();
+      try {
+        if (t.isConnected()) {
+          // Another thread already completed the handshake; no need to hold
+          // the write lock
+          handshakeLock.unlock();
+        } else {
+          CallFuture<T> callFuture = new CallFuture<T>(callback);
+          t.transceive(request.getBytes(),
+                       new TransceiverCallback<T>(request, callFuture));
+          // Block until handshake complete
+          callFuture.await();
+          return;
+        }
+      } finally{
+        if (handshakeLock.isHeldByCurrentThread()) {
+          handshakeLock.unlock();
+        }
       }
-    } while (!readHandshake(in));
-
-    // use remote protocol to read response
-    Message rm = remote.getMessages().get(messageName);
-    if (rm == null)
-      throw new AvroRuntimeException("Not a remote message: "+messageName);
-
-    if ((m.isOneWay() != rm.isOneWay()) && t.isConnected())
-      throw new AvroRuntimeException("Not both one-way messages: "+messageName);
-
-    if (m.isOneWay() && t.isConnected()) return null; // one-way w/ handshake
-        
-    context.setResponseCallMeta(META_READER.read(null, in));
-    
-    if (!in.readBoolean()) {                      // no error
-      Object response = readResponse(rm.getResponse(), m.getResponse(), in);
-      context.setResponse(response);
-      for (RPCPlugin plugin : rpcMetaPlugins) {
-        plugin.clientReceiveResponse(context);
+    }
+    
+    if (request.getMessage().isOneWay()) {
+      t.lockChannel();
+      try {
+        t.writeBuffers(request.getBytes());
+        if (callback != null) {
+          callback.handleResult(null);
+        }
+      } finally {
+        t.unlockChannel();
       }
-      return response;
-      
     } else {
-      Exception error = readError(rm.getErrors(), m.getErrors(), in);
-      context.setError(error);
-      for (RPCPlugin plugin : rpcMetaPlugins) {
-        plugin.clientReceiveResponse(context);
-      }
-      throw error;
+      t.transceive(request.getBytes(),
+                   new TransceiverCallback<T>(request, callback));
     }
     
   }
 
-  private static final Map<String,MD5> REMOTE_HASHES =
-    Collections.synchronizedMap(new HashMap<String,MD5>());
-  private static final Map<MD5,Protocol> REMOTE_PROTOCOLS =
-    Collections.synchronizedMap(new HashMap<MD5,Protocol>());
+  private static final ConcurrentMap<String,MD5> REMOTE_HASHES =
+    new ConcurrentHashMap<String,MD5>();
+  private static final ConcurrentMap<MD5,Protocol> REMOTE_PROTOCOLS =
+    new ConcurrentHashMap<MD5,Protocol>();
 
   private static final SpecificDatumWriter<HandshakeRequest> HANDSHAKE_WRITER =
     new SpecificDatumWriter<HandshakeRequest>(HandshakeRequest.class);
@@ -185,10 +191,11 @@ public abstract class Requestor {
     localHash.bytes(local.getMD5());
     String remoteName = transceiver.getRemoteName();
     MD5 remoteHash = REMOTE_HASHES.get(remoteName);
-    remote = REMOTE_PROTOCOLS.get(remoteHash);
     if (remoteHash == null) {                     // guess remote is local
       remoteHash = localHash;
       remote = local;
+    } else {
+      remote = REMOTE_PROTOCOLS.get(remoteHash);
     }
     HandshakeRequest handshake = new HandshakeRequest();
     handshake.clientHash = localHash;
@@ -244,30 +251,36 @@ public abstract class Requestor {
     remote = Protocol.parse(handshake.serverProtocol.toString());
     MD5 remoteHash = (MD5)handshake.serverHash;
     REMOTE_HASHES.put(transceiver.getRemoteName(), remoteHash);
-    if (!REMOTE_PROTOCOLS.containsKey(remoteHash))
-      REMOTE_PROTOCOLS.put(remoteHash, remote);
+    REMOTE_PROTOCOLS.putIfAbsent(remoteHash, remote);
   }
 
   /** Return the remote protocol.  Force a handshake if required. */
-  public synchronized Protocol getRemote() throws IOException {
+  public Protocol getRemote() throws IOException {
     if (remote != null) return remote;            // already have it
     MD5 remoteHash = REMOTE_HASHES.get(transceiver.getRemoteName());
-    remote = REMOTE_PROTOCOLS.get(remoteHash);
-    if (remote != null) return remote;            // already cached
-    // force handshake
-    ByteBufferOutputStream bbo = new ByteBufferOutputStream();
-    // direct because the payload is tiny.
-    Encoder out = ENCODER_FACTORY.directBinaryEncoder(bbo, null);
-    writeHandshake(out);
-    out.writeInt(0);                              // empty metadata
-    out.writeString("");                          // bogus message name
-    List<ByteBuffer> response =
-      getTransceiver().transceive(bbo.getBufferList());
-    ByteBufferInputStream bbi = new ByteBufferInputStream(response);
-    BinaryDecoder in =
-      DecoderFactory.get().binaryDecoder(bbi, null);
-    readHandshake(in);
-    return this.remote;
+    if (remoteHash != null) {
+      remote = REMOTE_PROTOCOLS.get(remoteHash);
+      if (remote != null) return remote;            // already cached
+    }
+    handshakeLock.lock();
+    try {
+      // force handshake
+      ByteBufferOutputStream bbo = new ByteBufferOutputStream();
+      // direct because the payload is tiny.
+      Encoder out = ENCODER_FACTORY.directBinaryEncoder(bbo, null);
+      writeHandshake(out);
+      out.writeInt(0);                              // empty metadata
+      out.writeString("");                          // bogus message name
+      List<ByteBuffer> response =
+        getTransceiver().transceive(bbo.getBufferList());
+      ByteBufferInputStream bbi = new ByteBufferInputStream(response);
+      BinaryDecoder in =
+        DecoderFactory.get().binaryDecoder(bbi, null);
+      readHandshake(in);
+      return this.remote;
+    } finally {
+      handshakeLock.unlock();
+    }
   }
 
 
@@ -292,5 +305,249 @@ public abstract class Requestor {
   /** Reads an error message. */
   public abstract Exception readError(Schema writer, Schema reader, Decoder in)
     throws IOException;
-}
+  
+  /**
+   * Handles callbacks from transceiver invocations.
+   */
+  protected class TransceiverCallback<T> implements Callback<List<ByteBuffer>> {
+    private final Request request;
+    private final Callback<T> callback;
+    
+    /**
+     * Creates a TransceiverCallback.
+     * @param request the request to set.
+     * @param callback the callback to set.
+     */
+    public TransceiverCallback(Request request, Callback<T> callback) {
+      this.request = request;
+      this.callback = callback;
+    }
+    
+    @Override
+    @SuppressWarnings("unchecked")
+    public void handleResult(List<ByteBuffer> responseBytes) {
+      ByteBufferInputStream bbi = new ByteBufferInputStream(responseBytes);
+      BinaryDecoder in = DecoderFactory.get().binaryDecoder(bbi, null);
+      try {
+        if (!readHandshake(in)) {
+          // Resend the handshake and return
+          Request handshake = new Request(request);
+          getTransceiver().transceive
+            (handshake.getBytes(),
+             new TransceiverCallback<T>(handshake, callback));
+          return;
+        }
+      } catch (Exception e) {
+        LOG.error("Error handling transceiver callback: " + e, e);
+      }
+      
+      // Read response; invoke callback
+      Response response = new Response(request, in);
+      Object responseObject;
+      try {
+        try {
+          responseObject = response.getResponse();
+        } catch (Exception e) {
+          if (callback != null) {
+            callback.handleError(e);
+          }
+          return;
+        }
+        if (callback != null) {
+          callback.handleResult((T)responseObject);
+        }
+      } catch (Throwable t) {
+        LOG.error("Error in callback handler: " + t, t);
+      }
+    }
+    
+    @Override
+    public void handleError(Throwable error) {
+      callback.handleError(error);
+    }
+  }
+  
+  /**
+   * Encapsulates/generates a request.
+   */
+  class Request {
+    private final String messageName;
+    private final Object request;
+    private final RPCContext context;
+    private final BinaryEncoder encoder;
+    private Message message;
+    private List<ByteBuffer> requestBytes;
+    
+    /**
+     * Creates a Request.
+     * @param messageName the name of the message to invoke.
+     * @param request the request data to send.
+     * @param context the RPC context to use.
+     */
+    public Request(String messageName, Object request, RPCContext context) {
+      this(messageName, request, context, null);
+    }
+    
+    /**
+     * Creates a Request.
+     * @param messageName the name of the message to invoke.
+     * @param request the request data to send.
+     * @param context the RPC context to use.
+     * @param encoder the BinaryEncoder to use to serialize the request.
+     */
+    public Request(String messageName, Object request, RPCContext context,
+                   BinaryEncoder encoder) {
+      this.messageName = messageName;
+      this.request = request;
+      this.context = context;
+      this.encoder =
+        ENCODER_FACTORY.binaryEncoder(new ByteBufferOutputStream(), encoder);
+    }
+    
+    /**
+     * Copy constructor.
+     * @param other Request from which to copy fields.
+     */
+    public Request(Request other) {
+      this.messageName = other.messageName;
+      this.request = other.request;
+      this.context = other.context;
+      this.encoder = other.encoder;
+    }
+    
+    /**
+     * Gets the message name.
+     * @return the message name.
+     */
+    public String getMessageName() {
+      return messageName;
+    }
+    
+    /**
+     * Gets the RPC context.
+     * @return the RPC context.
+     */
+    public RPCContext getContext() {
+      return context;
+    }
+    
+    /**
+     * Gets the Message associated with this request.
+     * @return this request's message.
+     */
+    public Message getMessage() {
+      if (message == null) {
+        message = getLocal().getMessages().get(messageName);
+        if (message == null) {
+          throw new AvroRuntimeException("Not a local message: "+messageName);
+        }
+      }
+      return message;
+    }
+    
+    /**
+     * Gets the request data, generating it first if necessary.
+     * @return the request data.
+     * @throws Exception if an error occurs generating the request data.
+     */
+    public List<ByteBuffer> getBytes() 
+      throws Exception {
+      if (requestBytes == null) {
+        ByteBufferOutputStream bbo = new ByteBufferOutputStream();
+        BinaryEncoder out = ENCODER_FACTORY.binaryEncoder(bbo, encoder);
+
+        // use local protocol to write request
+        Message m = getMessage();
+        context.setMessage(m);
+
+        writeRequest(m.getRequest(), request, out); // write request payload
+
+        out.flush();
+        List<ByteBuffer> payload = bbo.getBufferList();
+
+        writeHandshake(out);                     // prepend handshake if needed
+
+        context.setRequestPayload(payload);
+        for (RPCPlugin plugin : rpcMetaPlugins) {
+          plugin.clientSendRequest(context);      // get meta-data from plugins
+        }
+        META_WRITER.write(context.requestCallMeta(), out);
 
+        out.writeString(m.getName());             // write message name
+
+        out.flush();
+        bbo.append(payload);
+
+        requestBytes = bbo.getBufferList();
+      }
+      return requestBytes;
+    }
+  }
+  
+  /**
+   * Encapsulates/parses a response.
+   */
+  class Response {
+    private final Request request;
+    private final BinaryDecoder in;
+    
+    /**
+     * Creates a Response.
+     * @param request the Request associated with this response.
+     */
+    public Response(Request request) {
+      this(request, null);
+    }
+    
+    /**
+     * Creates a Creates a Response.
+     * @param request the Request associated with this response.
+     * @param in the BinaryDecoder to use to deserialize the response.
+     */
+    public Response(Request request, BinaryDecoder in) {
+      this.request = request;
+      this.in = in;
+    }
+    
+    /**
+     * Gets the RPC response, reading/deserializing it first if necessary.
+     * @return the RPC response.
+     * @throws Exception if an error occurs reading/deserializing the response.
+     */
+    public Object getResponse() 
+      throws Exception {
+      Message lm = request.getMessage();
+      Message rm = remote.getMessages().get(request.getMessageName());
+      if (rm == null)
+        throw new AvroRuntimeException
+          ("Not a remote message: "+request.getMessageName());
+
+      Transceiver t = getTransceiver();
+      if ((lm.isOneWay() != rm.isOneWay()) && t.isConnected())
+        throw new AvroRuntimeException
+          ("Not both one-way messages: "+request.getMessageName());
+
+      if (lm.isOneWay() && t.isConnected()) return null; // one-way w/ handshake
+      
+      RPCContext context = request.getContext();
+      context.setResponseCallMeta(META_READER.read(null, in));
+
+      if (!in.readBoolean()) {                      // no error
+        Object response = readResponse(rm.getResponse(), lm.getResponse(), in);
+        context.setResponse(response);
+        for (RPCPlugin plugin : rpcMetaPlugins) {
+          plugin.clientReceiveResponse(context);
+        }
+        return response;
+
+      } else {
+        Exception error = readError(rm.getErrors(), lm.getErrors(), in);
+        context.setError(error);
+        for (RPCPlugin plugin : rpcMetaPlugins) {
+          plugin.clientReceiveResponse(context);
+        }
+        throw error;
+      }
+    }
+  }
+}

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java Thu Jun 16 09:33:29 2011
@@ -21,9 +21,8 @@ package org.apache.avro.ipc;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -62,12 +61,12 @@ public abstract class Responder {
   private static final ThreadLocal<Protocol> REMOTE =
     new ThreadLocal<Protocol>();
 
-  private Map<MD5,Protocol> protocols
-    = Collections.synchronizedMap(new HashMap<MD5,Protocol>());
+  private final Map<MD5,Protocol> protocols
+    = new ConcurrentHashMap<MD5,Protocol>();
 
-  private Protocol local;
-  private MD5 localHash;
-  protected List<RPCPlugin> rpcMetaPlugins;
+  private final Protocol local;
+  private final MD5 localHash;
+  protected final List<RPCPlugin> rpcMetaPlugins;
 
   protected Responder(Protocol local) {
     this.local = local;
@@ -75,7 +74,7 @@ public abstract class Responder {
     localHash.bytes(local.getMD5());
     protocols.put(localHash, local);
     this.rpcMetaPlugins =
-      Collections.synchronizedList(new ArrayList<RPCPlugin>());
+      new CopyOnWriteArrayList<RPCPlugin>();
   }
 
   /** Return the remote protocol.  Accesses a {@link ThreadLocal} that's set

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java Thu Jun 16 09:33:29 2011
@@ -22,21 +22,58 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.avro.Protocol;
 
 /** Base transport class used by {@link Requestor}. */
 public abstract class Transceiver implements Closeable {
+  private final ReentrantLock channelLock = new ReentrantLock();
 
   public abstract String getRemoteName();
+  
+  /**
+   * Acquires an exclusive lock on the transceiver's channel.
+   */
+  public void lockChannel() {
+    channelLock.lock();
+  }
+  
+  /**
+   * Releases the lock on the transceiver's channel if held by the calling thread.
+   */
+  public void unlockChannel() {
+    if (channelLock.isHeldByCurrentThread()) {
+      channelLock.unlock();
+    }
+  }
 
   /** Called by {@link Requestor#request(String,Object)} for two-way messages.
    * By default calls {@link #writeBuffers(List)} followed by
    * {@link #readBuffers()}. */
-  public synchronized List<ByteBuffer> transceive(List<ByteBuffer> request)
+  public List<ByteBuffer> transceive(List<ByteBuffer> request)
+    throws IOException {
+    lockChannel();
+    try {
+      writeBuffers(request);
+      return readBuffers();
+    } finally {
+      unlockChannel();
+    }
+  }
+  
+  /** 
+   * Called by {@link Requestor#request(String,Object,Callback)} for two-way messages using callbacks.
+   */
+  public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback)
     throws IOException {
-    writeBuffers(request);
-    return readBuffers();
+    // The default implementation works synchronously
+    try {
+      List<ByteBuffer> response = transceive(request);
+      callback.handleResult(response);
+    } catch (IOException e) {
+      callback.handleError(e);
+    }
   }
 
   /** Called by the default definition of {@link #transceive(List)}.*/

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java Thu Jun 16 09:33:29 2011
@@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericDa
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.Encoder;
+import org.apache.avro.ipc.Callback;
 import org.apache.avro.ipc.Requestor;
 import org.apache.avro.ipc.Transceiver;
 
@@ -53,6 +54,20 @@ public class GenericRequestor extends Re
   }
 
   @Override
+  public <T> void request(String messageName, Object request, Callback<T> callback)
+    throws IOException {
+    try {
+      super.request(messageName, request, callback);
+    } catch (Exception e) {
+      if (e instanceof RuntimeException)
+        throw (RuntimeException)e;
+      if (e instanceof IOException)
+        throw (IOException)e;
+      throw new AvroRemoteException(e);
+    }
+  }
+
+  @Override
   public void writeRequest(Schema schema, Object request, Encoder out)
     throws IOException {
     new GenericDatumWriter<Object>(schema).write(request, out);

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java Thu Jun 16 09:33:29 2011
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Type;
+import java.util.Arrays;
 
 import org.apache.avro.Protocol;
 import org.apache.avro.Schema;
@@ -32,6 +34,7 @@ import org.apache.avro.io.Decoder;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.ipc.Transceiver;
 import org.apache.avro.ipc.Requestor;
+import org.apache.avro.ipc.Callback;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
@@ -52,7 +55,20 @@ public class SpecificRequestor extends R
   @Override
   public Object invoke(Object proxy, Method method, Object[] args)
     throws Throwable {
-    return request(method.getName(), args);
+    // Check if this is a callback-based RPC:
+    Type[] parameterTypes = method.getParameterTypes();
+    if ((parameterTypes.length > 0) &&
+        (parameterTypes[parameterTypes.length - 1] instanceof Class) &&
+        Callback.class.isAssignableFrom(((Class<?>)parameterTypes[parameterTypes.length - 1]))) {
+      // Extract the Callback from the end of of the argument list
+      Object[] finalArgs = Arrays.copyOf(args, args.length - 1);
+      Callback<?> callback = (Callback<?>)args[args.length - 1];
+      request(method.getName(), finalArgs, callback);
+      return null;
+    }
+    else {
+      return request(method.getName(), args);
+    }
   }
 
   protected DatumWriter<Object> getDatumWriter(Schema schema) {

Added: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java?rev=1136342&view=auto
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java (added)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java Thu Jun 16 09:33:29 2011
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro;
+
+import java.net.InetSocketAddress;
+
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.Transceiver;
+
+/**
+ * Protocol test with Netty server and transceiver
+ */
+public class TestProtocolNetty extends TestProtocolSpecific {
+  @Override
+  public Server createServer(Responder testResponder) throws Exception {
+    return new NettyServer(responder, new InetSocketAddress(0));
+  }
+  
+  @Override
+  public Transceiver createTransceiver() throws Exception{
+    return new NettyTransceiver(new InetSocketAddress(server.getPort()));
+  }
+  
+  @Override
+  protected int getExpectedHandshakeCount() {
+    return REPEATING;
+  }
+}

Added: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java?rev=1136342&view=auto
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java (added)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java Thu Jun 16 09:33:29 2011
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.test.Simple;
+import org.apache.avro.test.TestError;
+import org.apache.avro.test.TestRecord;
+import org.apache.avro.util.Utf8;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Tests asynchronous RPCs with Netty.
+ */
+public class TestNettyServerWithCallbacks {
+  private static Server server;
+  private static Transceiver transceiver;
+  private static Simple.Callback simpleClient;
+  private static final AtomicBoolean ackFlag = new AtomicBoolean(false);
+  private static final AtomicReference<CountDownLatch> ackLatch = 
+    new AtomicReference<CountDownLatch>(new CountDownLatch(1));
+  private static Simple simpleService = new SimpleImpl(ackFlag);
+  
+  @BeforeClass
+  public static void initializeConnections() throws Exception {
+    // start server
+    Responder responder = new SpecificResponder(Simple.class, simpleService);
+    server = new NettyServer(responder, new InetSocketAddress(0));
+    server.start();
+  
+    int serverPort = server.getPort();
+    System.out.println("server port : " + serverPort);
+
+    transceiver = new NettyTransceiver(new InetSocketAddress(
+        serverPort));
+    simpleClient = SpecificRequestor.getClient(Simple.Callback.class, transceiver);
+  }
+  
+  @AfterClass
+  public static void tearDownConnections() throws Exception {
+    if (transceiver != null) {
+      transceiver.close();
+    }
+    if (server != null) {
+      server.close();
+    }
+  }
+  
+  @Test
+  public void greeting() throws Exception {
+    // Test synchronous RPC:
+    Assert.assertEquals(new Utf8("Hello, how are you?"), simpleClient.hello("how are you?"));
+    
+    // Test asynchronous RPC (future):
+    CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
+    simpleClient.hello("World!", future1);
+    Assert.assertEquals(new Utf8("Hello, World!"), future1.get(2, TimeUnit.SECONDS));
+    Assert.assertNull(future1.getError());
+    
+    // Test asynchronous RPC (callback):
+    final CallFuture<CharSequence> future2 = new CallFuture<CharSequence>();
+    simpleClient.hello("what's up?", new Callback<CharSequence>() {
+      @Override
+      public void handleResult(CharSequence result) {
+        future2.handleResult(result);
+      }
+      @Override
+      public void handleError(Throwable error) {
+        future2.handleError(error);
+      }
+    });
+    Assert.assertEquals(new Utf8("Hello, what's up?"), future2.get(2, TimeUnit.SECONDS));
+    Assert.assertNull(future2.getError());
+  }
+  
+  @Test
+  public void echo() throws Exception {
+    TestRecord record = new TestRecord();
+    record.hash = new org.apache.avro.test.MD5();
+    record.hash.bytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8 } );
+    record.kind = org.apache.avro.test.Kind.FOO;
+    record.name = "My Record";
+    
+    // Test synchronous RPC:
+    Assert.assertEquals(record, simpleClient.echo(record));
+    
+    // Test asynchronous RPC (future):
+    CallFuture<TestRecord> future1 = new CallFuture<TestRecord>();
+    simpleClient.echo(record, future1);
+    Assert.assertEquals(record, future1.get(2, TimeUnit.SECONDS));
+    Assert.assertNull(future1.getError());
+    
+    // Test asynchronous RPC (callback):
+    final CallFuture<TestRecord> future2 = new CallFuture<TestRecord>();
+    simpleClient.echo(record, new Callback<TestRecord>() {
+      @Override
+      public void handleResult(TestRecord result) {
+        future2.handleResult(result);
+      }
+      @Override
+      public void handleError(Throwable error) {
+        future2.handleError(error);
+      }
+    });
+    Assert.assertEquals(record, future2.get(2, TimeUnit.SECONDS));
+    Assert.assertNull(future2.getError());
+  }
+  
+  @Test
+  public void add() throws Exception {
+    // Test synchronous RPC:
+    Assert.assertEquals(8, simpleClient.add(2, 6));
+    
+    // Test asynchronous RPC (future):
+    CallFuture<Integer> future1 = new CallFuture<Integer>();
+    simpleClient.add(8, 8, future1);
+    Assert.assertEquals(new Integer(16), future1.get(2, TimeUnit.SECONDS));
+    Assert.assertNull(future1.getError());
+    
+    // Test asynchronous RPC (callback):
+    final CallFuture<Integer> future2 = new CallFuture<Integer>();
+    simpleClient.add(512, 256, new Callback<Integer>() {
+      @Override
+      public void handleResult(Integer result) {
+        future2.handleResult(result);
+      }
+      @Override
+      public void handleError(Throwable error) {
+        future2.handleError(error);
+      }
+    });
+    Assert.assertEquals(new Integer(768), future2.get(2, TimeUnit.SECONDS));
+    Assert.assertNull(future2.getError());
+  }
+  
+  @Test
+  public void echoBytes() throws Exception {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 });
+    
+    // Test synchronous RPC:
+    Assert.assertEquals(byteBuffer, simpleClient.echoBytes(byteBuffer));
+    
+    // Test asynchronous RPC (future):
+    CallFuture<ByteBuffer> future1 = new CallFuture<ByteBuffer>();
+    simpleClient.echoBytes(byteBuffer, future1);
+    Assert.assertEquals(byteBuffer, future1.get(2, TimeUnit.SECONDS));
+    Assert.assertNull(future1.getError());
+    
+    // Test asynchronous RPC (callback):
+    final CallFuture<ByteBuffer> future2 = new CallFuture<ByteBuffer>();
+    simpleClient.echoBytes(byteBuffer, new Callback<ByteBuffer>() {
+      @Override
+      public void handleResult(ByteBuffer result) {
+        future2.handleResult(result);
+      }
+      @Override
+      public void handleError(Throwable error) {
+        future2.handleError(error);
+      }
+    });
+    Assert.assertEquals(byteBuffer, future2.get(2, TimeUnit.SECONDS));
+    Assert.assertNull(future2.getError());
+  }
+  
+  @Test()
+  public void error() throws IOException, InterruptedException, TimeoutException {
+    // Test synchronous RPC:
+    try {
+      simpleClient.error();
+      Assert.fail("Expected " + TestError.class.getCanonicalName());
+    } catch (TestError e) {
+      // Expected
+    } catch (AvroRemoteException e) {
+      e.printStackTrace();
+      Assert.fail("Unexpected error: " + e.toString());
+    }
+    
+    // Test asynchronous RPC (future):
+    CallFuture<Void> future = new CallFuture<Void>();
+    simpleClient.error(future);
+    try {
+      future.get(2, TimeUnit.SECONDS);
+      Assert.fail("Expected " + TestError.class.getCanonicalName() + " to be thrown");
+    } catch (ExecutionException e) {
+      Assert.assertTrue("Expected " + TestError.class.getCanonicalName(), 
+          e.getCause() instanceof TestError);
+    }
+    Assert.assertNotNull(future.getError());
+    Assert.assertTrue("Expected " + TestError.class.getCanonicalName(), 
+        future.getError() instanceof TestError);
+    Assert.assertNull(future.getResult());
+    
+    // Test asynchronous RPC (callback):
+    final CountDownLatch latch = new CountDownLatch(1);
+    final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
+    simpleClient.error(new Callback<Void>() {
+      @Override
+      public void handleResult(Void result) {
+        Assert.fail("Expected " + TestError.class.getCanonicalName());
+      }
+      @Override
+      public void handleError(Throwable error) {
+        errorRef.set(error);
+        latch.countDown();
+      }
+    });
+    Assert.assertTrue("Timed out waiting for error", latch.await(2, TimeUnit.SECONDS));
+    Assert.assertNotNull(errorRef.get());
+    Assert.assertTrue(errorRef.get() instanceof TestError);
+  }
+  
+  @Test
+  public void ack() throws Exception {
+    simpleClient.ack();
+    ackLatch.get().await(2, TimeUnit.SECONDS);
+    Assert.assertTrue("Expected ack flag to be set", ackFlag.get());
+    
+    ackLatch.set(new CountDownLatch(1));
+    simpleClient.ack();
+    ackLatch.get().await(2, TimeUnit.SECONDS);
+    Assert.assertFalse("Expected ack flag to be cleared", ackFlag.get());
+  }
+  
+  @Ignore
+  @Test
+  public void performanceTest() throws Exception {
+    final int threadCount = 8;
+    final long runTimeMillis = 10 * 1000L;
+    ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);
+    
+    System.out.println("Running performance test for " + runTimeMillis + "ms...");
+    final AtomicLong rpcCount = new AtomicLong(0L);
+    final AtomicBoolean runFlag = new AtomicBoolean(true);
+    final CountDownLatch startLatch = new CountDownLatch(threadCount);
+    for (int ii = 0; ii < threadCount; ii++) {
+      threadPool.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            startLatch.countDown();
+            startLatch.await(2, TimeUnit.SECONDS);
+            while (runFlag.get()) {
+              rpcCount.incrementAndGet();
+              Assert.assertEquals(new Utf8("Hello, World!"), simpleClient.hello("World!"));
+            }
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+      });
+    }
+    
+    startLatch.await(2, TimeUnit.SECONDS);
+    Thread.sleep(runTimeMillis);
+    runFlag.set(false);
+    threadPool.shutdown();
+    Assert.assertTrue("Timed out shutting down thread pool", threadPool.awaitTermination(2, TimeUnit.SECONDS));
+    System.out.println("Completed " + rpcCount.get() + " RPCs in " + runTimeMillis + 
+        "ms => " + (((double)rpcCount.get() / (double)runTimeMillis) * 1000) + " RPCs/sec, " + 
+        ((double)runTimeMillis / (double)rpcCount.get()) + " ms/RPC.");
+  }
+  
+  /**
+   * Implementation of the Simple interface.
+   */
+  private static class SimpleImpl implements Simple {
+    private final AtomicBoolean ackFlag;
+    
+    /**
+     * Creates a SimpleImpl.
+     * @param ackFlag the AtomicBoolean to toggle when ack() is called.
+     */
+    public SimpleImpl(final AtomicBoolean ackFlag) {
+      this.ackFlag = ackFlag;
+    }
+    
+    @Override
+    public CharSequence hello(CharSequence greeting) throws AvroRemoteException {
+      return "Hello, " + greeting;
+    }
+
+    @Override
+    public TestRecord echo(TestRecord record) throws AvroRemoteException {
+      return record;
+    }
+
+    @Override
+    public int add(int arg1, int arg2) throws AvroRemoteException {
+      return arg1 + arg2;
+    }
+
+    @Override
+    public ByteBuffer echoBytes(ByteBuffer data) throws AvroRemoteException {
+      return data;
+    }
+
+    @Override
+    public Void error() throws AvroRemoteException, TestError {
+      TestError error = new TestError();
+      error.message = "Test Message";
+      throw error;
+    }
+
+    @Override
+    synchronized public void ack() {
+      ackFlag.set(!ackFlag.get());
+      ackLatch.get().countDown();
+    }
+  }
+}



Mime
View raw message