avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r958149 - in /avro/trunk: ./ lang/java/src/java/org/apache/avro/ipc/ lang/java/src/test/java/org/apache/avro/
Date Fri, 25 Jun 2010 23:40:38 GMT
Author: cutting
Date: Fri Jun 25 23:40:37 2010
New Revision: 958149

URL: http://svn.apache.org/viewvc?rev=958149&view=rev
Log:
AVRO-578. Java: add payload data to RPC context for use by plugins.  Contributed by Patrick
Wendell.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCContext.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCPlugin.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/RPCMetaTestPlugin.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=958149&r1=958148&r2=958149&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Jun 25 23:40:37 2010
@@ -22,6 +22,9 @@ Avro 1.4.0 (unreleased)
     AVRO-567. Add command-line tools for text file import & export.
     (Patrick Wendell via cutting)
 
+    AVRO-578. Java: add payload data to RPC context for use by
+    plugins.  (Patrick Wendell via cutting)
+
   IMPROVEMENTS
     AVRO-584. Update Histogram for Stats Plugin
     (Patrick Wendell via philz)

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java?rev=958149&r1=958148&r2=958149&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java Fri Jun
25 23:40:37 2010
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
-import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 
 /** Utility to collect data written to an {@link OutputStream} in {@link
@@ -44,8 +44,24 @@ public class ByteBufferOutputStream exte
     return result;
   }
 
+  /** Prepend a list of ByteBuffers to this stream. */
+  public void prepend(List<ByteBuffer> lists) {
+    for (ByteBuffer buffer: lists) {
+      buffer.position(buffer.limit());
+    }
+    buffers.addAll(0, lists);
+  }
+  
+  /** Append a list of ByteBuffers to this stream. */
+  public void append(List<ByteBuffer> lists) {
+    for (ByteBuffer buffer: lists) {
+      buffer.position(buffer.limit());
+    }
+    buffers.addAll(lists);
+  }
+  
   public void reset() {
-    buffers = new ArrayList<ByteBuffer>(1);
+    buffers = new LinkedList<ByteBuffer>();
     buffers.add(ByteBuffer.allocate(BUFFER_SIZE));
   }
 

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCContext.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCContext.java?rev=958149&r1=958148&r2=958149&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCContext.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCContext.java Fri Jun 25 23:40:37
2010
@@ -19,6 +19,7 @@ package org.apache.avro.ipc;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.avro.Protocol.Message;
@@ -28,7 +29,9 @@ import org.apache.avro.util.Utf8;
  * This class represents the context of an RPC call or RPC handshake.
  * Designed to provide information to RPC plugin writers,
  * this class encapsulates information about the rpc exchange,
- * including handshake and call metadata.
+ * including handshake and call metadata. Note: this data includes
+ * full copies of the RPC payload, so plugins which store RPCContexts
+ * beyond the life of each call should be conscious of memory use.
  *
  */
 public class RPCContext {
@@ -39,6 +42,8 @@ public class RPCContext {
   protected Object response;
   protected Exception error;
   private Message message;
+  List<ByteBuffer> requestPayload;
+  List<ByteBuffer> responsePayload;
   
   /**
    * This is an access method for the handshake state
@@ -148,10 +153,44 @@ public class RPCContext {
   public boolean isError() {
     return error != null;
   }
-
+  
+  /** Sets the {@link Message} corresponding to this RPC */
   public void setMessage(Message message) {
     this.message = message;    
   }
   
+  /** Returns the {@link Message} corresponding to this RPC
+   * @return this RPC's {@link Message} 
+   */
   public Message getMessage() { return message; }
+  
+  /** Sets the serialized payload of the request in this RPC. Will
+   * not include handshake or meta-data. */
+  public void setRequestPayload(List<ByteBuffer> payload) {
+    this.requestPayload = payload;
+  }
+ 
+  /** Returns the serialized payload of the request in this RPC. Will
+   * not include handshake or meta-data. If the request payload has not been
+   * set yet, returns null.
+   * 
+   * @return this RPC's request payload.*/
+  public List<ByteBuffer> getRequestPayload() {
+    return this.requestPayload;
+  }
+  
+  /** Returns the serialized payload of the response in this RPC. Will
+   * not include handshake or meta-data. If the response payload has not been
+   * set yet, returns null.
+   * 
+   * @return this RPC's response payload.*/
+  public List<ByteBuffer> getResponsePayload() {
+    return this.responsePayload;
+  }
+  
+  /** Sets the serialized payload of the response in this RPC. Will
+   * not include handshake or meta-data. */
+  public void setResponsePayload(List<ByteBuffer> payload) {
+    this.responsePayload = payload;
+  }
 }

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCPlugin.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCPlugin.java?rev=958149&r1=958148&r2=958149&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCPlugin.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCPlugin.java Fri Jun 25 23:40:37 2010
@@ -43,12 +43,13 @@ public class RPCPlugin {
    * @param context the handshake rpc context
    */
   public void clientFinishConnect(RPCContext context) { }
-  
+
   /**
    * This method is invoked at the client before it issues the RPC call.
    * @param context the per-call rpc context (in/out parameter)
    */
   public void clientSendRequest(RPCContext context) { }
+ 
   
   /**
    * This method is invoked at the RPC server when the request is received,
@@ -58,17 +59,18 @@ public class RPCPlugin {
   public void serverReceiveRequest(RPCContext context) { }
   
   /**
-   * This method is invoked at the server after the call is executed,
-   * but before the response is returned to the client
+   * This method is invoked at the server before the response is executed,
+   * but before the response has been formulated
    * @param context the per-call rpc context (in/out parameter)
    */
   public void serverSendResponse(RPCContext context) { }
-    
+  
   /**
    * This method is invoked at the client after the call is executed,
    * and after the client receives the response
    * @param context the per-call rpc context
    */
   public void clientReceiveResponse(RPCContext context) { }
+
   
 }

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java?rev=958149&r1=958148&r2=958149&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java Fri Jun 25 23:40:37 2010
@@ -92,28 +92,36 @@ public abstract class Requestor {
     do {
       ByteBufferOutputStream bbo = new ByteBufferOutputStream();
       Encoder out = new BinaryEncoder(bbo);
-
-      writeHandshake(out);                      // prepend handshake if needed
-
+      
       // use local protocol to write request
       m = getLocal().getMessages().get(messageName);
       if (m == null)
         throw new AvroRuntimeException("Not a local message: "+messageName);
       context.setMessage(m);
+    
+      writeRequest(m.getRequest(), request, out); // write request payload
+      List<ByteBuffer> payload = bbo.getBufferList();
       
+      context.setRequestPayload(payload);
       for (RPCPlugin plugin : rpcMetaPlugins) {
-        plugin.clientSendRequest(context);
+        plugin.clientSendRequest(context);        // get meta-data from plugins
       }
       
+      writeHandshake(out);                       // prepend handshake if needed
       META_WRITER.write(context.requestCallMeta(), out);
       out.writeString(m.getName());               // write message name
-      writeRequest(m.getRequest(), request, out); // write request payload
       
+      bbo.append(payload);
+      
+      List<ByteBuffer> requestBytes = bbo.getBufferList();
+
       if (m.isOneWay() && t.isConnected()) {      // send one-way message
-        t.writeBuffers(bbo.getBufferList());
+        t.writeBuffers(requestBytes);
+        
         return null;
       } else {                                    // two-way message
-        List<ByteBuffer> response = t.transceive(bbo.getBufferList());
+        List<ByteBuffer> response = t.transceive(requestBytes);
+        context.setResponsePayload(response);
         ByteBufferInputStream bbi = new ByteBufferInputStream(response);
         in = DecoderFactory.defaultFactory().createBinaryDecoder(bbi, in);
       }

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java?rev=958149&r1=958148&r2=958149&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java Fri Jun 25 23:40:37 2010
@@ -96,15 +96,18 @@ public abstract class Responder {
     Decoder in = DecoderFactory.defaultFactory().createBinaryDecoder(
         new ByteBufferInputStream(buffers), null);
     ByteBufferOutputStream bbo = new ByteBufferOutputStream();
-    Encoder out = new BinaryEncoder(bbo);
+    BinaryEncoder out = new BinaryEncoder(bbo);
     Exception error = null;
     RPCContext context = new RPCContext();
+    List<ByteBuffer> payload = null;
+    List<ByteBuffer> handshake = null;
     boolean wasConnected = connection != null && connection.isConnected();
     try {
       Protocol remote = handshake(in, out, connection);
       if (remote == null)                        // handshake failed
         return bbo.getBufferList();
-
+      handshake = bbo.getBufferList();
+      
       // read request using remote protocol specification
       context.setRequestCallMeta(META_READER.read(null, in));
       String messageName = in.readString(null).toString();
@@ -112,10 +115,10 @@ public abstract class Responder {
       if (rm == null)
         throw new AvroRuntimeException("No such remote message: "+messageName);
       
-      context.setMessage(rm);
-      
       Object request = readRequest(rm.getRequest(), in);
       
+      context.setRequestPayload(buffers);
+      context.setMessage(rm);
       for (RPCPlugin plugin : rpcMetaPlugins) {
         plugin.serverReceiveRequest(context);
       }
@@ -129,6 +132,7 @@ public abstract class Responder {
         throw new AvroRuntimeException("Not both one-way: "+messageName);
 
       Object response = null;
+      
       try {
         response = respond(m, request);
         context.setResponse(response);
@@ -140,27 +144,33 @@ public abstract class Responder {
       if (m.isOneWay() && wasConnected)           // no response data
         return null;
 
-      for (RPCPlugin plugin : rpcMetaPlugins) {
-        plugin.serverSendResponse(context);
-      }
-      
-      META_WRITER.write(context.responseCallMeta(), out);
       out.writeBoolean(error != null);
       if (error == null)
         writeResponse(m.getResponse(), response, out);
       else
         writeError(m.getErrors(), error, out);
-
     } catch (Exception e) {                       // system error
       LOG.warn("system error", e);
       context.setError(e);
       bbo = new ByteBufferOutputStream();
       out = new BinaryEncoder(bbo);
-      META_WRITER.write(context.responseCallMeta(), out);
       out.writeBoolean(true);
       writeError(Protocol.SYSTEM_ERRORS, new Utf8(e.toString()), out);
     }
-      
+
+    payload = bbo.getBufferList();
+    
+    // Grab meta-data from plugins
+    context.setResponsePayload(payload);
+    for (RPCPlugin plugin : rpcMetaPlugins) {
+      plugin.serverSendResponse(context);
+    }
+    META_WRITER.write(context.responseCallMeta(), out);
+    
+    // Prepend handshake and append payload
+    bbo.prepend(handshake);
+    bbo.append(payload);
+
     return bbo.getBufferList();
   }
 

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java?rev=958149&r1=958148&r2=958149&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java Fri Jun 25 23:40:37
2010
@@ -80,6 +80,7 @@ public class SocketTransceiver extends T
     throws IOException {
     if (buffers == null) return;                  // no data to write
     for (ByteBuffer buffer : buffers) {
+      if (buffer.limit() == 0) continue;
       writeLength(buffer.limit());                // length-prefix
       channel.write(buffer);
     }

Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/RPCMetaTestPlugin.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/RPCMetaTestPlugin.java?rev=958149&r1=958148&r2=958149&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/RPCMetaTestPlugin.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/RPCMetaTestPlugin.java Fri Jun 25 23:40:37
2010
@@ -31,7 +31,8 @@ import org.apache.avro.util.Utf8;
  * This plugin tests handshake and call state by passing a string as metadata,
  * slowly building it up at each instrumentation point, testing it as it goes.
  * Finally, after the call or handshake is complete, the constructed string is
- * tested.
+ * tested. It also tests that RPC context data is appropriately filled in 
+ * along the way by Requestor and Responder classes.
  */
 public final class RPCMetaTestPlugin extends RPCPlugin {
   
@@ -52,6 +53,8 @@ public final class RPCMetaTestPlugin ext
     
     Assert.assertNotNull(context.requestHandshakeMeta());
     Assert.assertNotNull(context.responseHandshakeMeta());
+    Assert.assertNull(context.getRequestPayload());
+    Assert.assertNull(context.getResponsePayload());
     
     if (!context.requestHandshakeMeta().containsKey(key)) return;
     
@@ -72,6 +75,8 @@ public final class RPCMetaTestPlugin ext
   public void clientFinishConnect(RPCContext context) {
     Map<Utf8,ByteBuffer> handshakeMeta = context.responseHandshakeMeta();
     
+    Assert.assertNull(context.getRequestPayload());
+    Assert.assertNull(context.getResponsePayload());
     Assert.assertNotNull(handshakeMeta);
     
     if (!handshakeMeta.containsKey(key)) return;
@@ -96,13 +101,18 @@ public final class RPCMetaTestPlugin ext
     ByteBuffer buf = ByteBuffer.wrap("ap".getBytes());
     context.requestCallMeta().put(key, buf);
     Assert.assertNotNull(context.getMessage());
+    Assert.assertNotNull(context.getRequestPayload());
+    Assert.assertNull(context.getResponsePayload());
   }
   
   @Override
   public void serverReceiveRequest(RPCContext context) {
     Map<Utf8,ByteBuffer> meta = context.requestCallMeta();
     
-    Assert.assertNotNull(meta);
+    Assert.assertNotNull(meta);    
+    Assert.assertNotNull(context.getMessage());
+    Assert.assertNotNull(context.getRequestPayload());
+    Assert.assertNull(context.getResponsePayload());
     
     if (!meta.containsKey(key)) return;
     
@@ -117,14 +127,15 @@ public final class RPCMetaTestPlugin ext
     buf = ByteBuffer.wrap((partialstr + "a").getBytes());
     Assert.assertTrue(buf.remaining() > 0);
     meta.put(key, buf);
-    
-    Assert.assertNotNull(context.getMessage());
   }
   
   @Override
   public void serverSendResponse(RPCContext context) {
     Assert.assertNotNull(context.requestCallMeta());
     Assert.assertNotNull(context.responseCallMeta());
+
+    Assert.assertNotNull(context.getRequestPayload());
+    Assert.assertNotNull(context.getResponsePayload());
     
     if (!context.requestCallMeta().containsKey(key)) return;
     
@@ -144,6 +155,8 @@ public final class RPCMetaTestPlugin ext
   @Override
   public void clientReceiveResponse(RPCContext context) {
     Assert.assertNotNull(context.responseCallMeta());
+    Assert.assertNotNull(context.getRequestPayload());
+    Assert.assertNotNull(context.getResponsePayload());
     
     if (!context.responseCallMeta().containsKey(key)) return;
     



Mime
View raw message