avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sgar...@apache.org
Subject svn commit: r1099257 - in /avro/trunk: lang/java/ipc/src/main/java/org/apache/avro/ipc/ lang/java/ipc/src/test/java/org/apache/avro/ipc/ share/test/schemas/
Date Tue, 03 May 2011 21:15:53 GMT
Author: sgargan
Date: Tue May  3 21:15:52 2011
New Revision: 1099257

URL: http://svn.apache.org/viewvc?rev=1099257&view=rev
Log:
Avro-815. Netty Transceiver fails processing one-way messages

Implemented writeBuffers call in the NettyTranceiver to send a NettyDataPack similar to the
request/response call. Added one-way 
message definition to the test mail.avpr protocol file and updated unit test to verify the
one-way behavior.


Modified:
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java
    avro/trunk/share/test/schemas/mail.avpr

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java?rev=1099257&r1=1099256&r2=1099257&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java Tue May  3
21:15:52 2011
@@ -133,8 +133,11 @@ public class NettyServer implements Serv
         NettyDataPack dataPack = (NettyDataPack) e.getMessage();
         List<ByteBuffer> req = dataPack.getDatas();
         List<ByteBuffer> res = responder.respond(req, connectionMetadata);
-        dataPack.setDatas(res);
-        e.getChannel().write(dataPack);
+		// response will be null for oneway messages.
+        if(res != null) {
+          dataPack.setDatas(res);
+          e.getChannel().write(dataPack);          
+        }
       } catch (IOException ex) {
         LOG.warn("unexpect error");
       } finally {

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1099257&r1=1099256&r2=1099257&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Tue May
 3 21:15:52 2011
@@ -144,7 +144,7 @@ public class NettyTransceiver extends Tr
 
   @Override
   public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
-    throw new UnsupportedOperationException();
+    channel.write(new NettyDataPack(serialGenerator.incrementAndGet(), buffers));
   }
 
   @Override

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java?rev=1099257&r1=1099256&r2=1099257&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java Tue May
 3 21:15:52 2011
@@ -18,7 +18,11 @@
 
 package org.apache.avro.ipc;
 
+import static org.junit.Assert.assertEquals;
+
 import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
@@ -27,52 +31,109 @@ import org.apache.avro.ipc.specific.Spec
 import org.apache.avro.test.Mail;
 import org.apache.avro.test.Message;
 import org.apache.avro.util.Utf8;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestNettyServer {
 
+  private static Server server;
+  private static Transceiver transceiver;
+  private static Mail proxy;
+  private static MailImpl mailService;
+
   public static class MailImpl implements Mail {
+
+    private CountDownLatch allMessages = new CountDownLatch(5);
+    
     // in this simple example just return details of the message
     public CharSequence send(Message message) {
       return new Utf8("Sent message to [" + message.to.toString() + "] from ["
           + message.from.toString() + "] with body [" + message.body.toString()
           + "]");
     }
-  }
+    
+    public void fireandforget(Message message) {
+      allMessages.countDown();
+    }
+    
+    private void awaitMessages() throws InterruptedException {
+      allMessages.await(2, TimeUnit.SECONDS);
+    }
+    
+    private void assertAllMessagesReceived() {
+      assertEquals(0, allMessages.getCount());
+    }
 
-  @Test
-  public void test() throws Exception {
+    public void reset() {
+      allMessages = new CountDownLatch(5);      
+    }
+  }
+  
+  @BeforeClass
+  public static void initializeConnections()throws Exception {
     // start server
     System.out.println("starting server...");
-    Responder responder = new SpecificResponder(Mail.class, new MailImpl());
-    Server server = new NettyServer(responder, new InetSocketAddress(0));
+    mailService = new MailImpl();
+    Responder responder = new SpecificResponder(Mail.class, mailService);
+    server = new NettyServer(responder, new InetSocketAddress(0));
     server.start();
-
+  
     int serverPort = server.getPort();
     System.out.println("server port : " + serverPort);
 
-    // client
-    Transceiver transceiver = new NettyTransceiver(new InetSocketAddress(
+    transceiver = new NettyTransceiver(new InetSocketAddress(
         serverPort));
-    Mail proxy = SpecificRequestor.getClient(Mail.class, transceiver);
+    proxy = SpecificRequestor.getClient(Mail.class, transceiver);
+  }
+  
+  @AfterClass
+  public static void tearDownConnections() throws Exception{
+    transceiver.close();
+    server.close();
+  }
 
+  @Test
+  public void testRequestResponse() throws Exception {
+      for(int x = 0; x < 5; x++) {
+        verifyResponse(proxy.send(createMessage()));
+      }
+  }
+
+  private void verifyResponse(CharSequence result) {
+    Assert.assertEquals(
+        "Sent message to [wife] from [husband] with body [I love you!]",
+        result.toString());
+  }
+  
+  @Test
+  public void testOneway() throws Exception {
+    for (int x = 0; x < 5; x++) {
+      proxy.fireandforget(createMessage());
+    }
+    mailService.awaitMessages();
+    mailService.assertAllMessagesReceived();
+  }
+  
+  @Test
+  public void testMixtureOfRequests() throws Exception {
+    mailService.reset();
+    for (int x = 0; x < 5; x++) {
+      Message createMessage = createMessage();
+      proxy.fireandforget(createMessage);
+      verifyResponse(proxy.send(createMessage));
+    }
+    mailService.awaitMessages();
+    mailService.assertAllMessagesReceived();
+
+  }
+
+  private Message createMessage() {
     Message msg = new Message();
     msg.to = new Utf8("wife");
     msg.from = new Utf8("husband");
     msg.body = new Utf8("I love you!");
-
-    try {
-      for(int x = 0; x < 5; x++) {
-        CharSequence result = proxy.send(msg);
-        System.out.println("Result: " + result);
-        Assert.assertEquals(
-            "Sent message to [wife] from [husband] with body [I love you!]",
-            result.toString());
-      }
-    } finally {
-      transceiver.close();
-      server.close();
-    }
+    return msg;
   }
 
 }

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java?rev=1099257&r1=1099256&r2=1099257&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java
(original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java
Tue May  3 21:15:52 2011
@@ -94,5 +94,7 @@ public class TestRpcPluginOrdering {
     public CharSequence send(Message message) throws AvroRemoteException {
       return new Utf8("Received");
     }
+    public void fireandforget(Message message) {
+    }
   }
 }

Modified: avro/trunk/share/test/schemas/mail.avpr
URL: http://svn.apache.org/viewvc/avro/trunk/share/test/schemas/mail.avpr?rev=1099257&r1=1099256&r2=1099257&view=diff
==============================================================================
--- avro/trunk/share/test/schemas/mail.avpr (original)
+++ avro/trunk/share/test/schemas/mail.avpr Tue May  3 21:15:52 2011
@@ -15,6 +15,12 @@
      "send": {
          "request": [{"name": "message", "type": "Message"}],
          "response": "string"
+     },
+     "fireandforget": {
+         "request": [{"name": "message", "type": "Message"}],
+         "response": "null",
+         "one-way": true
      }
+
  }
-}
\ No newline at end of file
+}



Mime
View raw message