avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1056494 - in /avro/trunk: ./ lang/java/ipc/src/main/java/org/apache/avro/ipc/ lang/java/ipc/src/test/java/org/apache/avro/ lang/java/ipc/src/test/java/org/apache/avro/ipc/
Date Fri, 07 Jan 2011 20:36:11 GMT
Author: cutting
Date: Fri Jan  7 20:36:11 2011
New Revision: 1056494

URL: http://svn.apache.org/viewvc?rev=1056494&view=rev
Log:
AVRO-708. Java: Fix Netty-based RPC to keep connection open.  Contributed by Stephen Gargan.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolDatagram.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecific.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecificMeta.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestLocalTransceiver.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1056494&r1=1056493&r2=1056494&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Jan  7 20:36:11 2011
@@ -118,6 +118,9 @@ Avro 1.5.0 (unreleased)
     AVRO-722. Java: Fix ordering of calls to RPC plugins.
     (Stephen Gargan via cutting)
 
+    AVRO-708. Java: Fix Netty-based RPC to keep connection open.
+    (Stephen Gargan via cutting)
+
 Avro 1.4.1 (13 October 2010)
 
   NEW FEATURES

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java?rev=1056494&r1=1056493&r2=1056494&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java Fri Jan  7
20:36:11 2011
@@ -109,6 +109,8 @@ public class NettyServer implements Serv
    */
   class NettyServerAvroHandler extends SimpleChannelUpstreamHandler {
 
+    private NettyTransceiver connectionMetadata = new NettyTransceiver();
+    
     @Override
     public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
         throws Exception {
@@ -130,13 +132,15 @@ public class NettyServer implements Serv
       try {
         NettyDataPack dataPack = (NettyDataPack) e.getMessage();
         List<ByteBuffer> req = dataPack.getDatas();
-        List<ByteBuffer> res = responder.respond(req);
+        List<ByteBuffer> res = responder.respond(req, connectionMetadata);
         dataPack.setDatas(res);
         e.getChannel().write(dataPack);
       } catch (IOException ex) {
         LOG.warn("unexpect error");
       } finally {
-        e.getChannel().close();
+        if(!connectionMetadata.isConnected()){
+          e.getChannel().close();
+        }
       }
     }
 

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1056494&r1=1056493&r2=1056494&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Fri Jan
 7 20:36:11 2011
@@ -69,6 +69,8 @@ public class NettyTransceiver extends Tr
     new ConcurrentHashMap<Integer, CallFuture>();
   
   private Protocol remote;
+
+  NettyTransceiver() {}
   
   public NettyTransceiver(InetSocketAddress addr) {
     // Set up.

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolDatagram.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolDatagram.java?rev=1056494&r1=1056493&r2=1056494&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolDatagram.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolDatagram.java Fri Jan
 7 20:36:11 2011
@@ -17,28 +17,32 @@
  */
 package org.apache.avro;
 
+import java.net.InetSocketAddress;
 import java.util.Random;
+
 import org.apache.avro.ipc.DatagramServer;
 import org.apache.avro.ipc.DatagramTransceiver;
-import org.apache.avro.specific.SpecificRequestor;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.Transceiver;
 import org.apache.avro.specific.SpecificResponder;
 import org.apache.avro.test.Simple;
-import org.junit.Before;
-
-import java.net.InetSocketAddress;
 
 public class TestProtocolDatagram extends TestProtocolSpecific {
-
-  @Before @Override
-  public void testStartServer() throws Exception {
-    if (server != null) return;
-    server =
-      new DatagramServer(new SpecificResponder(Simple.class, new TestImpl()),
-                         new InetSocketAddress("localhost",
-                                               new Random().nextInt(10000)+10000));
-    server.start();
-    client = new DatagramTransceiver(new InetSocketAddress("localhost", server.getPort()));
-    proxy = SpecificRequestor.getClient(Simple.class, client);
+  @Override
+  public Server createServer(Responder testResponder) throws Exception {
+    return  new DatagramServer(new SpecificResponder(Simple.class, new TestImpl()),
+        new InetSocketAddress("localhost",
+            new Random().nextInt(10000)+10000));
+  }
+  
+  @Override
+  public Transceiver createTransceiver() throws Exception{
+    return new DatagramTransceiver(new InetSocketAddress("localhost", server.getPort()));
   }
 
+  @Override
+  protected int getExpectedHandshakeCount() {
+    return 0;
+  }
 }

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java?rev=1056494&r1=1056493&r2=1056494&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java Fri Jan 
7 20:36:11 2011
@@ -17,14 +17,15 @@
  */
 package org.apache.avro;
 
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.ipc.Responder;
 import org.apache.avro.ipc.HttpServer;
 import org.apache.avro.ipc.HttpTransceiver;
 import org.apache.avro.specific.SpecificRequestor;
-import org.apache.avro.specific.SpecificResponder;
 import org.apache.avro.test.Simple;
 
 import org.junit.Test;
-import org.junit.Before;
 
 import java.net.URL;
 import java.net.ServerSocket;
@@ -33,19 +34,22 @@ import java.lang.reflect.UndeclaredThrow
 
 public class TestProtocolHttp extends TestProtocolSpecific {
 
-  @Before @Override
-  public void testStartServer() throws Exception {
-    if (server != null) return;
-    server =
-      new HttpServer(new SpecificResponder(Simple.class, new TestImpl()), 0);
-    server.start();
-    client =
-      new HttpTransceiver(new URL("http://127.0.0.1:"+server.getPort()+"/"));
-    proxy = SpecificRequestor.getClient(Simple.class, client);
+  @Override
+  public Server createServer(Responder testResponder) throws Exception {
+    return new HttpServer(testResponder, 0);
+  }
+  
+  @Override
+  public Transceiver createTransceiver() throws Exception{
+    return new HttpTransceiver(new URL("http://127.0.0.1:"+server.getPort()+"/"));
+  }
+ 
+  protected int getExpectedHandshakeCount() {
+    return REPEATING;
   }
 
   @Test(expected=SocketTimeoutException.class)
-  public void testTimeout() throws Throwable {
+    public void testTimeout() throws Throwable {
     ServerSocket s = new ServerSocket(0);
     HttpTransceiver client =
       new HttpTransceiver(new URL("http://127.0.0.1:"+s.getLocalPort()+"/"));

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecific.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecific.java?rev=1056494&r1=1056493&r2=1056494&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecific.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecific.java Fri Jan
 7 20:36:11 2011
@@ -18,6 +18,10 @@
 package org.apache.avro;
 
 import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.RPCContext;
+import org.apache.avro.ipc.RPCPlugin;
+import org.apache.avro.ipc.Requestor;
+import org.apache.avro.ipc.Responder;
 import org.apache.avro.ipc.Server;
 import org.apache.avro.ipc.SocketServer;
 import org.apache.avro.ipc.SocketTransceiver;
@@ -30,11 +34,12 @@ import org.apache.avro.test.MD5;
 import org.apache.avro.test.TestError;
 import org.apache.avro.test.TestRecord;
 import org.apache.avro.util.Utf8;
+import org.junit.After;
 import org.junit.Test;
 import org.junit.Before;
 import org.junit.AfterClass;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+
+import static org.junit.Assert.*;
 
 import java.io.File;
 import java.io.FileReader;
@@ -48,6 +53,7 @@ import java.util.Random;
 
 public class TestProtocolSpecific {
 
+  protected static final int REPEATING = -1;
   protected static final File SERVER_PORTS_DIR
   = new File(System.getProperty("test.dir", "/tmp")+"/server-ports/");
 
@@ -75,14 +81,35 @@ public class TestProtocolSpecific {
   protected static Transceiver client;
   protected static Simple proxy;
 
+  protected static SpecificResponder responder;
+
+  protected static HandshakeMonitor monitor;
+
   @Before
   public void testStartServer() throws Exception {
     if (server != null) return;
-    server = new SocketServer(new SpecificResponder(Simple.class, new TestImpl()),
-                              new InetSocketAddress(0));
+    responder = new SpecificResponder(Simple.class, new TestImpl());
+    server = createServer(responder);
     server.start();
-    client = new SocketTransceiver(new InetSocketAddress(server.getPort()));
-    proxy = SpecificRequestor.getClient(Simple.class, client);
+    
+    client = createTransceiver();
+    SpecificRequestor req = new SpecificRequestor(Simple.class, client);
+    addRpcPlugins(req);
+    proxy = SpecificRequestor.getClient(Simple.class, (SpecificRequestor)req);
+    
+    monitor = new HandshakeMonitor();
+    responder.addRPCPlugin(monitor);
+  }
+  
+  public void addRpcPlugins(Requestor requestor){}
+  
+  public Server createServer(Responder testResponder) throws Exception{
+    return server = new SocketServer(testResponder,
+                              new InetSocketAddress(0));   
+  }
+  
+  public Transceiver createTransceiver() throws Exception{
+    return new SocketTransceiver(new InetSocketAddress(server.getPort()));
   }
 
   @Test public void testGetRemote() throws IOException {
@@ -175,11 +202,49 @@ public class TestProtocolSpecific {
     try { Thread.sleep(100); } catch (InterruptedException e) {}
     assertEquals(2, ackCount);
   }
+  
+  @Test
+  public void testRepeatedAccess() throws Exception {
+    for (int x = 0; x < 1000; x++) {
+      proxy.hello("hi!");
+    }
+  }
+  
+  @After
+  public void testHandshakeOccursOnce() throws IOException{
+    monitor.assertHandshake();
+  }
 
   @AfterClass
   public static void testStopServer() throws IOException {
     client.close();
     server.close();
+    server = null;
+  }
+  
+  public class HandshakeMonitor extends RPCPlugin{
+    
+    private int handshakes;
+    
+    @Override
+    public void serverConnecting(RPCContext context) {
+      handshakes++;
+      int expected = getExpectedHandshakeCount();
+      if(expected > 0  && handshakes > expected){
+        throw new IllegalStateException("Expected number of Protocol negotiation handshakes
exceeded expected "+expected+" was "+handshakes);
+      }
+    }
+    
+    public void assertHandshake(){
+      int expected = getExpectedHandshakeCount();
+      if(expected != REPEATING){
+        assertEquals("Expected number of handshakes did not take place.", expected, handshakes);
+      }
+    }
+  }
+  
+  protected int getExpectedHandshakeCount() {
+   return 1;
   }
 
   public static class InteropTest {

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecificMeta.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecificMeta.java?rev=1056494&r1=1056493&r2=1056494&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecificMeta.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecificMeta.java Fri
Jan  7 20:36:11 2011
@@ -19,29 +19,30 @@ package org.apache.avro;
 
 import java.net.InetSocketAddress;
 
+import org.apache.avro.ipc.Requestor;
 import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Server;
 import org.apache.avro.ipc.SocketServer;
 import org.apache.avro.ipc.SocketTransceiver;
-import org.apache.avro.specific.SpecificRequestor;
-import org.apache.avro.specific.SpecificResponder;
-import org.apache.avro.test.Simple;
-import org.junit.Before;
+import org.apache.avro.ipc.Transceiver;
+
 
 public class TestProtocolSpecificMeta extends TestProtocolSpecific {
   
-  @Before @Override
-  public void testStartServer() throws Exception {
-    if (server != null) return;
-    Responder responder = new SpecificResponder(Simple.class, new TestImpl());
+  @Override
+  public Server createServer(Responder testResponder) throws Exception {
     responder.addRPCPlugin(new RPCMetaTestPlugin("key1"));
     responder.addRPCPlugin(new RPCMetaTestPlugin("key2"));
-    server = new SocketServer(responder, new InetSocketAddress(0));
-    server.start();
-    
-    client = new SocketTransceiver(new InetSocketAddress(server.getPort()));
-    SpecificRequestor req = new SpecificRequestor(Simple.class, client);
+    return new SocketServer(responder, new InetSocketAddress(0));
+  }
+  
+  @Override
+  public Transceiver createTransceiver() throws Exception {
+    return new SocketTransceiver(new InetSocketAddress(server.getPort()));
+  }
+  
+  public void addRpcPlugins(Requestor req){
     req.addRPCPlugin(new RPCMetaTestPlugin("key1"));
     req.addRPCPlugin(new RPCMetaTestPlugin("key2"));
-    proxy = SpecificRequestor.getClient(Simple.class, (SpecificRequestor)req);
   }
 }

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestLocalTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestLocalTransceiver.java?rev=1056494&r1=1056493&r2=1056494&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestLocalTransceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestLocalTransceiver.java Fri
Jan  7 20:36:11 2011
@@ -58,7 +58,9 @@ public class TestLocalTransceiver {
         "m").getRequest());
     params.put("x", new Utf8("hello"));
     GenericRequestor r = new GenericRequestor(protocol, t);
-    assertEquals(new Utf8("there"), r.request("m", params));
+    
+    for(int x = 0; x < 5; x++)
+      assertEquals(new Utf8("there"), r.request("m", params));
   }
 
 }

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java?rev=1056494&r1=1056493&r2=1056494&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java Fri Jan
 7 20:36:11 2011
@@ -62,11 +62,13 @@ public class TestNettyServer {
     msg.body = new Utf8("I love you!");
 
     try {
-      CharSequence result = proxy.send(msg);
-      System.out.println("Result: " + result);
-      Assert.assertEquals(
-          "Sent message to [wife] from [husband] with body [I love you!]",
-          result.toString());
+      for(int x = 0; x < 5; x++) {
+        CharSequence result = proxy.send(msg);
+        System.out.println("Result: " + result);
+        Assert.assertEquals(
+            "Sent message to [wife] from [husband] with body [I love you!]",
+            result.toString());
+      }
     } finally {
       transceiver.close();
       server.close();



Mime
View raw message