hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1537330 [7/7] - in /hadoop/common/branches/YARN-321/hadoop-common-project: ./ hadoop-annotations/ hadoop-auth/ hadoop-common/ hadoop-common/dev-support/ hadoop-common/src/ hadoop-common/src/main/bin/ hadoop-common/src/main/conf/ hadoop-com...
Date Wed, 30 Oct 2013 22:22:15 GMT
Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcAcceptedReply.java Wed Oct 30 22:21:59 2013
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.oncrpc;
 
-import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+import org.apache.hadoop.oncrpc.security.Verifier;
 
 /** 
  * Represents RPC message MSG_ACCEPTED reply body. See RFC 1831 for details.
@@ -41,43 +41,42 @@ public class RpcAcceptedReply extends Rp
       return ordinal();
     }
   };
+  
+  public static RpcAcceptedReply getAcceptInstance(int xid, 
+      Verifier verifier) {
+    return getInstance(xid, AcceptState.SUCCESS, verifier);
+  }
+  
+  public static RpcAcceptedReply getInstance(int xid, AcceptState state,
+      Verifier verifier) {
+    return new RpcAcceptedReply(xid, ReplyState.MSG_ACCEPTED, verifier,
+        state);
+  }
 
-  private final RpcAuthInfo verifier;
   private final AcceptState acceptState;
 
-  RpcAcceptedReply(int xid, RpcMessage.Type messageType, ReplyState state,
-      RpcAuthInfo verifier, AcceptState acceptState) {
-    super(xid, messageType, state);
-    this.verifier = verifier;
+  RpcAcceptedReply(int xid, ReplyState state, Verifier verifier,
+      AcceptState acceptState) {
+    super(xid, state, verifier);
     this.acceptState = acceptState;
   }
 
-  public static RpcAcceptedReply read(int xid, RpcMessage.Type messageType,
-      ReplyState replyState, XDR xdr) {
-    RpcAuthInfo verifier = RpcAuthInfo.read(xdr);
+  public static RpcAcceptedReply read(int xid, ReplyState replyState, XDR xdr) {
+    Verifier verifier = Verifier.readFlavorAndVerifier(xdr);
     AcceptState acceptState = AcceptState.fromValue(xdr.readInt());
-    return new RpcAcceptedReply(xid, messageType, replyState, verifier,
-        acceptState);
-  }
-
-  public RpcAuthInfo getVerifier() {
-    return verifier;
+    return new RpcAcceptedReply(xid, replyState, verifier, acceptState);
   }
 
   public AcceptState getAcceptState() {
     return acceptState;
   }
   
-  public static XDR voidReply(XDR xdr, int xid) {
-    return voidReply(xdr, xid, AcceptState.SUCCESS);
-  }
-  
-  public static XDR voidReply(XDR xdr, int xid, AcceptState acceptState) {
+  @Override
+  public XDR write(XDR xdr) {
     xdr.writeInt(xid);
-    xdr.writeInt(RpcMessage.Type.RPC_REPLY.getValue());
-    xdr.writeInt(ReplyState.MSG_ACCEPTED.getValue());
-    xdr.writeInt(AuthFlavor.AUTH_NONE.getValue());
-    xdr.writeVariableOpaque(new byte[0]);
+    xdr.writeInt(messageType.getValue());
+    xdr.writeInt(replyState.getValue());
+    Verifier.writeFlavorAndVerifier(verifier, xdr);
     xdr.writeInt(acceptState.getValue());
     return xdr;
   }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCall.java Wed Oct 30 22:21:59 2013
@@ -19,6 +19,8 @@ package org.apache.hadoop.oncrpc;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.oncrpc.security.Credentials;
+import org.apache.hadoop.oncrpc.security.Verifier;
 
 /**
  * Represents an RPC message of type RPC call as defined in RFC 1831
@@ -26,21 +28,36 @@ import org.apache.commons.logging.LogFac
 public class RpcCall extends RpcMessage {
   public static final int RPC_VERSION = 2;
   private static final Log LOG = LogFactory.getLog(RpcCall.class);
+  
+  public static RpcCall read(XDR xdr) {
+    return new RpcCall(xdr.readInt(), RpcMessage.Type.fromValue(xdr.readInt()),
+        xdr.readInt(), xdr.readInt(), xdr.readInt(), xdr.readInt(), 
+        Credentials.readFlavorAndCredentials(xdr),
+        Verifier.readFlavorAndVerifier(xdr));
+  }
+  
+  public static RpcCall getInstance(int xid, int program, int version,
+      int procedure, Credentials cred, Verifier verifier) {
+    return new RpcCall(xid, RpcMessage.Type.RPC_CALL, 2, program, version,
+        procedure, cred, verifier);
+  }
+  
   private final int rpcVersion;
   private final int program;
   private final int version;
   private final int procedure;
-  private final RpcAuthInfo credential;
-  private final RpcAuthInfo verifier;
+  private final Credentials credentials;
+  private final Verifier verifier;
 
-  protected RpcCall(int xid, RpcMessage.Type messageType, int rpcVersion, int program,
-      int version, int procedure, RpcAuthInfo credential, RpcAuthInfo verifier) {
+  protected RpcCall(int xid, RpcMessage.Type messageType, int rpcVersion,
+      int program, int version, int procedure, Credentials credential,
+      Verifier verifier) {
     super(xid, messageType);
     this.rpcVersion = rpcVersion;
     this.program = program;
     this.version = version;
     this.procedure = procedure;
-    this.credential = credential;
+    this.credentials = credential;
     this.verifier = verifier;
     if (LOG.isTraceEnabled()) {
       LOG.trace(this);
@@ -79,29 +96,25 @@ public class RpcCall extends RpcMessage 
     return procedure;
   }
   
-  public RpcAuthInfo getCredential() {
-    return credential;
+  public Credentials getCredential() {
+    return credentials;
   }
 
-  public RpcAuthInfo getVerifier() {
+  public Verifier getVerifier() {
     return verifier;
   }
   
-  public static RpcCall read(XDR xdr) {
-    return new RpcCall(xdr.readInt(), RpcMessage.Type.fromValue(xdr.readInt()),
-        xdr.readInt(), xdr.readInt(),
-        xdr.readInt(), xdr.readInt(), RpcAuthInfo.read(xdr),
-        RpcAuthInfo.read(xdr));
-  }
-  
-  public static void write(XDR out, int xid, int program, int progVersion,
-      int procedure) {
-    out.writeInt(xid);
-    out.writeInt(RpcMessage.Type.RPC_CALL.getValue());
-    out.writeInt(2);
-    out.writeInt(program);
-    out.writeInt(progVersion);
-    out.writeInt(procedure);
+  @Override
+  public XDR write(XDR xdr) {
+    xdr.writeInt(xid);
+    xdr.writeInt(RpcMessage.Type.RPC_CALL.getValue());
+    xdr.writeInt(2);
+    xdr.writeInt(program);
+    xdr.writeInt(version);
+    xdr.writeInt(procedure);
+    Credentials.writeFlavorAndCredentials(credentials, xdr);
+    Verifier.writeFlavorAndVerifier(verifier, xdr);
+    return xdr;
   }
   
   @Override
@@ -109,6 +122,6 @@ public class RpcCall extends RpcMessage 
     return String.format("Xid:%d, messageType:%s, rpcVersion:%d, program:%d,"
         + " version:%d, procedure:%d, credential:%s, verifier:%s", xid,
         messageType, rpcVersion, program, version, procedure,
-        credential.toString(), verifier.toString());
+        credentials.toString(), verifier.toString());
   }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcCallCache.java Wed Oct 30 22:21:59 2013
@@ -44,7 +44,7 @@ import com.google.common.annotations.Vis
 public class RpcCallCache {
   
   public static class CacheEntry {
-    private XDR response; // null if no response has been sent
+    private RpcResponse response; // null if no response has been sent
     
     public CacheEntry() {
       response = null;
@@ -58,11 +58,11 @@ public class RpcCallCache {
       return response != null;
     }
     
-    public XDR getResponse() {
+    public RpcResponse getResponse() {
       return response;
     }
     
-    public void setResponse(XDR response) {
+    public void setResponse(RpcResponse response) {
       this.response = response;
     }
   }
@@ -128,13 +128,13 @@ public class RpcCallCache {
   }
 
   /** Mark a request as completed and add corresponding response to the cache */
-  public void callCompleted(InetAddress clientId, int xid, XDR response) {
+  public void callCompleted(InetAddress clientId, int xid, RpcResponse response) {
     ClientRequest req = new ClientRequest(clientId, xid);
     CacheEntry e;
     synchronized(map) {
       e = map.get(req);
     }
-    e.setResponse(response);
+    e.response = response;
   }
   
   /**

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcDeniedReply.java Wed Oct 30 22:21:59 2013
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.oncrpc;
 
-import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
+import org.apache.hadoop.oncrpc.security.Verifier;
 
 /** 
  * Represents RPC message MSG_DENIED reply body. See RFC 1831 for details.
@@ -40,16 +40,16 @@ public class RpcDeniedReply extends RpcR
 
   private final RejectState rejectState;
 
-  RpcDeniedReply(int xid, RpcMessage.Type messageType, ReplyState replyState,
-      RejectState rejectState) {
-    super(xid, messageType, replyState);
+  public RpcDeniedReply(int xid, ReplyState replyState,
+      RejectState rejectState, Verifier verifier) {
+    super(xid, replyState, verifier);
     this.rejectState = rejectState;
   }
 
-  public static RpcDeniedReply read(int xid, RpcMessage.Type messageType,
-      ReplyState replyState, XDR xdr) {
+  public static RpcDeniedReply read(int xid, ReplyState replyState, XDR xdr) {
+    Verifier verifier = Verifier.readFlavorAndVerifier(xdr);
     RejectState rejectState = RejectState.fromValue(xdr.readInt());
-    return new RpcDeniedReply(xid, messageType, replyState, rejectState);
+    return new RpcDeniedReply(xid, replyState, rejectState, verifier);
   }
 
   public RejectState getRejectState() {
@@ -59,17 +59,17 @@ public class RpcDeniedReply extends RpcR
   @Override
   public String toString() {
     return new StringBuffer().append("xid:").append(xid)
-        .append(",messageType:").append(messageType).append("rejectState:")
+        .append(",messageType:").append(messageType).append("verifier_flavor:")
+        .append(verifier.getFlavor()).append("rejectState:")
         .append(rejectState).toString();
   }
   
-  public static XDR voidReply(XDR xdr, int xid, ReplyState msgAccepted,
-      RejectState rejectState) {
+  @Override
+  public XDR write(XDR xdr) {
     xdr.writeInt(xid);
-    xdr.writeInt(RpcMessage.Type.RPC_REPLY.getValue());
-    xdr.writeInt(msgAccepted.getValue());
-    xdr.writeInt(AuthFlavor.AUTH_NONE.getValue());
-    xdr.writeVariableOpaque(new byte[0]);
+    xdr.writeInt(messageType.getValue());
+    xdr.writeInt(replyState.getValue());
+    Verifier.writeFlavorAndVerifier(verifier, xdr);
     xdr.writeInt(rejectState.getValue());
     return xdr;
   }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcMessage.java Wed Oct 30 22:21:59 2013
@@ -50,6 +50,8 @@ public abstract class RpcMessage {
     this.messageType = messageType;
   }
   
+  public abstract XDR write(XDR xdr);
+  
   public int getXid() {
     return xid;
   }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java Wed Oct 30 22:21:59 2013
@@ -18,21 +18,24 @@
 package org.apache.hadoop.oncrpc;
 
 import java.io.IOException;
-import java.net.InetAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
-import org.apache.hadoop.oncrpc.RpcCallCache.CacheEntry;
+import org.apache.hadoop.oncrpc.security.Verifier;
 import org.apache.hadoop.portmap.PortmapMapping;
 import org.apache.hadoop.portmap.PortmapRequest;
-import org.jboss.netty.channel.Channel;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 
 /**
  * Class for writing RPC server programs based on RFC 1050. Extend this class
  * and implement {@link #handleInternal} to handle the requests received.
  */
-public abstract class RpcProgram {
+public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
   private static final Log LOG = LogFactory.getLog(RpcProgram.class);
   public static final int RPCB_PORT = 111;
   private final String program;
@@ -41,7 +44,6 @@ public abstract class RpcProgram {
   private final int progNumber;
   private final int lowProgVersion;
   private final int highProgVersion;
-  private final RpcCallCache rpcCallCache;
   
   /**
    * Constructor
@@ -52,19 +54,15 @@ public abstract class RpcProgram {
    * @param progNumber program number as defined in RFC 1050
    * @param lowProgVersion lowest version of the specification supported
    * @param highProgVersion highest version of the specification supported
-   * @param cacheSize size of cache to handle duplciate requests. Size <= 0
-   *          indicates no cache.
    */
   protected RpcProgram(String program, String host, int port, int progNumber,
-      int lowProgVersion, int highProgVersion, int cacheSize) {
+      int lowProgVersion, int highProgVersion) {
     this.program = program;
     this.host = host;
     this.port = port;
     this.progNumber = progNumber;
     this.lowProgVersion = lowProgVersion;
     this.highProgVersion = highProgVersion;
-    this.rpcCallCache = cacheSize > 0 ? new RpcCallCache(program, cacheSize)
-        : null;
   }
 
   /**
@@ -102,88 +100,50 @@ public abstract class RpcProgram {
     }
   }
 
-  /**
-   * Handle an RPC request.
-   * @param rpcCall RPC call that is received
-   * @param in xdr with cursor at reading the remaining bytes of a method call
-   * @param out xdr output corresponding to Rpc reply
-   * @param client making the Rpc request
-   * @param channel connection over which Rpc request is received
-   * @return response xdr response
-   */
-  protected abstract XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
-      InetAddress client, Channel channel);
-  
-  public XDR handle(XDR xdr, InetAddress client, Channel channel) {
-    XDR out = new XDR();
-    RpcCall rpcCall = RpcCall.read(xdr);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(program + " procedure #" + rpcCall.getProcedure());
-    }
-    
-    if (!checkProgram(rpcCall.getProgram())) {
-      return programMismatch(out, rpcCall);
-    }
-
-    if (!checkProgramVersion(rpcCall.getVersion())) {
-      return programVersionMismatch(out, rpcCall);
-    }
-    
-    // Check for duplicate requests in the cache for non-idempotent requests
-    boolean idempotent = rpcCallCache != null && !isIdempotent(rpcCall);
-    if (idempotent) {
-      CacheEntry entry = rpcCallCache.checkOrAddToCache(client, rpcCall.getXid());
-      if (entry != null) { // in ache 
-        if (entry.isCompleted()) {
-          LOG.info("Sending the cached reply to retransmitted request "
-              + rpcCall.getXid());
-          return entry.getResponse();
-        } else { // else request is in progress
-          LOG.info("Retransmitted request, transaction still in progress "
-              + rpcCall.getXid());
-          // TODO: ignore the request?
-        }
-      }
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+      throws Exception {
+    RpcInfo info = (RpcInfo) e.getMessage();
+    RpcCall call = (RpcCall) info.header();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(program + " procedure #" + call.getProcedure());
     }
     
-    XDR response = handleInternal(rpcCall, xdr, out, client, channel);
-    if (response.size() == 0) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("No sync response, expect an async response for request XID="
-            + rpcCall.getXid());
-      }
+    if (this.progNumber != call.getProgram()) {
+      LOG.warn("Invalid RPC call program " + call.getProgram());
+      RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
+          AcceptState.PROG_UNAVAIL, Verifier.VERIFIER_NONE);
+
+      XDR out = new XDR();
+      reply.write(out);
+      ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+          .buffer());
+      RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+      RpcUtil.sendRpcResponse(ctx, rsp);
+      return;
+    }
+
+    int ver = call.getVersion();
+    if (ver < lowProgVersion || ver > highProgVersion) {
+      LOG.warn("Invalid RPC call version " + ver);
+      RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
+          AcceptState.PROG_MISMATCH, Verifier.VERIFIER_NONE);
+
+      XDR out = new XDR();
+      reply.write(out);
+      out.writeInt(lowProgVersion);
+      out.writeInt(highProgVersion);
+      ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
+          .buffer());
+      RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+      RpcUtil.sendRpcResponse(ctx, rsp);
+      return;
     }
     
-    // Add the request to the cache
-    if (idempotent) {
-      rpcCallCache.callCompleted(client, rpcCall.getXid(), response);
-    }
-    return response;
-  }
-  
-  private XDR programMismatch(XDR out, RpcCall call) {
-    LOG.warn("Invalid RPC call program " + call.getProgram());
-    RpcAcceptedReply.voidReply(out, call.getXid(), AcceptState.PROG_UNAVAIL);
-    return out;
-  }
-  
-  private XDR programVersionMismatch(XDR out, RpcCall call) {
-    LOG.warn("Invalid RPC call version " + call.getVersion());
-    RpcAcceptedReply.voidReply(out, call.getXid(), AcceptState.PROG_MISMATCH);
-    out.writeInt(lowProgVersion);
-    out.writeInt(highProgVersion);
-    return out;
-  }
-  
-  private boolean checkProgram(int progNumber) {
-    return this.progNumber == progNumber;
-  }
-  
-  /** Return true if a the program version in rpcCall is supported */
-  private boolean checkProgramVersion(int programVersion) {
-    return programVersion >= lowProgVersion
-        && programVersion <= highProgVersion;
+    handleInternal(ctx, info);
   }
+
+  protected abstract void handleInternal(ChannelHandlerContext ctx, RpcInfo info);
   
   @Override
   public String toString() {

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcReply.java Wed Oct 30 22:21:59 2013
@@ -17,6 +17,11 @@
  */
 package org.apache.hadoop.oncrpc;
 
+import org.apache.hadoop.oncrpc.security.RpcAuthInfo;
+import org.apache.hadoop.oncrpc.security.Verifier;
+
+import com.google.common.base.Preconditions;
+
 /**
  * Represents an RPC message of type RPC reply as defined in RFC 1831
  */
@@ -36,28 +41,35 @@ public abstract class RpcReply extends R
     }
   }
   
-  private final ReplyState state;
+  protected final ReplyState replyState;
+  protected final Verifier verifier;
   
-  RpcReply(int xid, RpcMessage.Type messageType, ReplyState state) {
-    super(xid, messageType);
-    this.state = state;
-    validateMessageType(RpcMessage.Type.RPC_REPLY);
+  RpcReply(int xid, ReplyState state, Verifier verifier) {
+    super(xid, RpcMessage.Type.RPC_REPLY);
+    this.replyState = state;
+    this.verifier = verifier;
+  }
+  
+  public RpcAuthInfo getVerifier() {
+    return verifier;
   }
 
   public static RpcReply read(XDR xdr) {
     int xid = xdr.readInt();
     final Type messageType = Type.fromValue(xdr.readInt());
+    Preconditions.checkState(messageType == RpcMessage.Type.RPC_REPLY);
+    
     ReplyState stat = ReplyState.fromValue(xdr.readInt());
     switch (stat) {
     case MSG_ACCEPTED:
-      return RpcAcceptedReply.read(xid, messageType, stat, xdr);
+      return RpcAcceptedReply.read(xid, stat, xdr);
     case MSG_DENIED:
-      return RpcDeniedReply.read(xid, messageType, stat, xdr);
+      return RpcDeniedReply.read(xid, stat, xdr);
     }
     return null;
   }
 
   public ReplyState getState() {
-    return state;
+    return replyState;
   }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java Wed Oct 30 22:21:59 2013
@@ -17,13 +17,152 @@
  */
 package org.apache.hadoop.oncrpc;
 
-/**
- * The XID in RPC call. It is used for starting with new seed after each reboot.
- */
-public class RpcUtil {
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+public final class RpcUtil {
+  /**
+   * The XID in RPC call. It is used for starting with new seed after each
+   * reboot.
+   */
   private static int xid = (int) (System.currentTimeMillis() / 1000) << 12;
 
   public static int getNewXid(String caller) {
     return xid = ++xid + caller.hashCode();
   }
+
+  public static void sendRpcResponse(ChannelHandlerContext ctx,
+      RpcResponse response) {
+    Channels.fireMessageReceived(ctx, response);
+  }
+
+  public static FrameDecoder constructRpcFrameDecoder() {
+    return new RpcFrameDecoder();
+  }
+
+  public static final SimpleChannelUpstreamHandler STAGE_RPC_MESSAGE_PARSER = new RpcMessageParserStage();
+  public static final SimpleChannelUpstreamHandler STAGE_RPC_TCP_RESPONSE = new RpcTcpResponseStage();
+  public static final SimpleChannelUpstreamHandler STAGE_RPC_UDP_RESPONSE = new RpcUdpResponseStage();
+
+  /**
+   * An RPC client can separate a RPC message into several frames (i.e.,
+   * fragments) when transferring it across the wire. RpcFrameDecoder
+   * reconstructs a full RPC message from these fragments.
+   *
+   * RpcFrameDecoder is a stateful pipeline stage. It has to be constructed for
+   * each RPC client.
+   */
+  static class RpcFrameDecoder extends FrameDecoder {
+    public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class);
+    private ChannelBuffer currentFrame;
+
+    @Override
+    protected Object decode(ChannelHandlerContext ctx, Channel channel,
+        ChannelBuffer buf) {
+
+      if (buf.readableBytes() < 4)
+        return null;
+
+      buf.markReaderIndex();
+
+      byte[] fragmentHeader = new byte[4];
+      buf.readBytes(fragmentHeader);
+      int length = XDR.fragmentSize(fragmentHeader);
+      boolean isLast = XDR.isLastFragment(fragmentHeader);
+
+      if (buf.readableBytes() < length) {
+        buf.resetReaderIndex();
+        return null;
+      }
+
+      ChannelBuffer newFragment = buf.readSlice(length);
+      if (currentFrame == null) {
+        currentFrame = newFragment;
+      } else {
+        currentFrame = ChannelBuffers.wrappedBuffer(currentFrame, newFragment);
+      }
+
+      if (isLast) {
+        ChannelBuffer completeFrame = currentFrame;
+        currentFrame = null;
+        return completeFrame;
+      } else {
+        return null;
+      }
+    }
+  }
+
+  /**
+   * RpcMessageParserStage parses the network bytes and encapsulates the RPC
+   * request into a RpcInfo instance.
+   */
+  static final class RpcMessageParserStage extends SimpleChannelUpstreamHandler {
+    private static final Log LOG = LogFactory
+        .getLog(RpcMessageParserStage.class);
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+      ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+      ByteBuffer b = buf.toByteBuffer().asReadOnlyBuffer();
+      XDR in = new XDR(b, XDR.State.READING);
+
+      RpcInfo info = null;
+      try {
+        RpcCall callHeader = RpcCall.read(in);
+        ChannelBuffer dataBuffer = ChannelBuffers.wrappedBuffer(in.buffer()
+            .slice());
+        info = new RpcInfo(callHeader, dataBuffer, ctx, e.getChannel(),
+            e.getRemoteAddress());
+      } catch (Exception exc) {
+        LOG.info("Malfromed RPC request from " + e.getRemoteAddress());
+      }
+
+      if (info != null) {
+        Channels.fireMessageReceived(ctx, info);
+      }
+    }
+  }
+
+  /**
+   * RpcTcpResponseStage sends an RpcResponse across the wire with the
+   * appropriate fragment header.
+   */
+  private static class RpcTcpResponseStage extends SimpleChannelUpstreamHandler {
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+      RpcResponse r = (RpcResponse) e.getMessage();
+      byte[] fragmentHeader = XDR.recordMark(r.data().readableBytes(), true);
+      ChannelBuffer header = ChannelBuffers.wrappedBuffer(fragmentHeader);
+      ChannelBuffer d = ChannelBuffers.wrappedBuffer(header, r.data());
+      e.getChannel().write(d);
+    }
+  }
+
+  /**
+   * RpcUdpResponseStage sends an RpcResponse as a UDP packet, which does not
+   * require a fragment header.
+   */
+  private static final class RpcUdpResponseStage extends
+      SimpleChannelUpstreamHandler {
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+        throws Exception {
+      RpcResponse r = (RpcResponse) e.getMessage();
+      e.getChannel().write(r.data(), r.remoteAddress());
+    }
+  }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java Wed Oct 30 22:21:59 2013
@@ -20,8 +20,6 @@ package org.apache.hadoop.oncrpc;
 import java.net.InetSocketAddress;
 import java.util.concurrent.Executors;
 
-import org.apache.hadoop.oncrpc.RpcFrameDecoder;
-import org.apache.hadoop.oncrpc.XDR;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
@@ -55,7 +53,8 @@ public class SimpleTcpClient {
     this.pipelineFactory = new ChannelPipelineFactory() {
       @Override
       public ChannelPipeline getPipeline() {
-        return Channels.pipeline(new RpcFrameDecoder(),
+        return Channels.pipeline(
+            RpcUtil.constructRpcFrameDecoder(),
             new SimpleTcpClientHandler(request));
       }
     };

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java Wed Oct 30 22:21:59 2013
@@ -27,6 +27,7 @@ import org.jboss.netty.channel.ChannelFa
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 
 /**
@@ -35,8 +36,7 @@ import org.jboss.netty.channel.socket.ni
 public class SimpleTcpServer {
   public static final Log LOG = LogFactory.getLog(SimpleTcpServer.class);
   protected final int port;
-  protected final ChannelPipelineFactory pipelineFactory;
-  protected final RpcProgram rpcProgram;
+  protected final SimpleChannelUpstreamHandler rpcProgram;
   
   /** The maximum number of I/O worker threads */
   protected final int workerCount;
@@ -50,17 +50,6 @@ public class SimpleTcpServer {
     this.port = port;
     this.rpcProgram = program;
     this.workerCount = workercount;
-    this.pipelineFactory = getPipelineFactory();
-  }
-
-  public ChannelPipelineFactory getPipelineFactory() {
-    return new ChannelPipelineFactory() {
-      @Override
-      public ChannelPipeline getPipeline() {
-        return Channels.pipeline(new RpcFrameDecoder(),
-            new SimpleTcpServerHandler(rpcProgram));
-      }
-    };
   }
   
   public void run() {
@@ -77,7 +66,15 @@ public class SimpleTcpServer {
     }
     
     ServerBootstrap bootstrap = new ServerBootstrap(factory);
-    bootstrap.setPipelineFactory(pipelineFactory);
+    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+
+      @Override
+      public ChannelPipeline getPipeline() throws Exception {
+        return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
+            RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
+            RpcUtil.STAGE_RPC_TCP_RESPONSE);
+      }
+    });
     bootstrap.setOption("child.tcpNoDelay", true);
     bootstrap.setOption("child.keepAlive", true);
     

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpClient.java Wed Oct 30 22:21:59 2013
@@ -57,8 +57,7 @@ public class SimpleUdpClient {
     clientSocket.receive(receivePacket);
 
     // Check reply status
-    XDR xdr = new XDR();
-    xdr.writeFixedOpaque(Arrays.copyOfRange(receiveData, 0,
+    XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0,
         receivePacket.getLength()));
     RpcReply reply = RpcReply.read(xdr);
     if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) {

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java Wed Oct 30 22:21:59 2013
@@ -23,9 +23,8 @@ import java.util.concurrent.Executors;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.channel.socket.DatagramChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
 
@@ -38,20 +37,13 @@ public class SimpleUdpServer {
   private final int RECEIVE_BUFFER_SIZE = 65536;
 
   protected final int port;
-  protected final ChannelPipelineFactory pipelineFactory;
-  protected final RpcProgram rpcProgram;
+  protected final SimpleChannelUpstreamHandler rpcProgram;
   protected final int workerCount;
 
-  public SimpleUdpServer(int port, RpcProgram program, int workerCount) {
+  public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) {
     this.port = port;
     this.rpcProgram = program;
     this.workerCount = workerCount;
-    this.pipelineFactory = new ChannelPipelineFactory() {
-      @Override
-      public ChannelPipeline getPipeline() {
-        return Channels.pipeline(new SimpleUdpServerHandler(rpcProgram));
-      }
-    };
   }
 
   public void run() {
@@ -60,8 +52,9 @@ public class SimpleUdpServer {
         Executors.newCachedThreadPool(), workerCount);
 
     ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
-    ChannelPipeline p = b.getPipeline();
-    p.addLast("handler", new SimpleUdpServerHandler(rpcProgram));
+    b.setPipeline(Channels.pipeline(
+            RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
+            RpcUtil.STAGE_RPC_UDP_RESPONSE));
 
     b.setOption("broadcast", "false");
     b.setOption("sendBufferSize", SEND_BUFFER_SIZE);

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java Wed Oct 30 22:21:59 2013
@@ -17,402 +17,256 @@
  */
 package org.apache.hadoop.oncrpc;
 
-import java.io.PrintStream;
-import java.util.Arrays;
+import java.nio.ByteBuffer;
 
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 /**
  * Utility class for building XDR messages based on RFC 4506.
- * <p>
- * This class maintains a buffer into which java types are written as
- * XDR types for building XDR messages. Similarly this class can
- * be used to get java types from an XDR request or response.
- * <p>
- * Currently only a subset of XDR types defined in RFC 4506 are supported.
+ *
+ * Key points of the format:
+ *
+ * <ul>
+ * <li>Primitives are stored in big-endian order (i.e., the default byte order
+ * of ByteBuffer).</li>
+ * <li>Booleans are stored as an integer.</li>
+ * <li>Each field in the message is always aligned by 4.</li>
+ * </ul>
+ *
  */
-public class XDR {
-  private final static  String HEXES = "0123456789abcdef";
-  
-  /** Internal buffer for reading or writing to */
-  private byte[] bytearr;
-  
-  /** Place to read from or write to */
-  private int cursor;
+public final class XDR {
+  private static final int DEFAULT_INITIAL_CAPACITY = 256;
+  private static final int SIZEOF_INT = 4;
+  private static final int SIZEOF_LONG = 8;
+  private static final byte[] PADDING_BYTES = new byte[] { 0, 0, 0, 0 };
 
-  public XDR() {
-    this(new byte[0]);
-  }
+  private ByteBuffer buf;
 
-  public XDR(byte[] data) {
-    bytearr = Arrays.copyOf(data, data.length);
-    cursor = 0;
+  public enum State {
+    READING, WRITING,
   }
 
+  private final State state;
+
   /**
-   * @param bytes bytes to be appended to internal buffer
+   * Construct a new XDR message buffer.
+   *
+   * @param initialCapacity
+   *          the initial capacity of the buffer.
    */
-  private void append(byte[] bytesToAdd) {
-    bytearr = append(bytearr, bytesToAdd);
+  public XDR(int initialCapacity) {
+    this(ByteBuffer.allocate(initialCapacity), State.WRITING);
   }
 
-  public int size() {
-    return bytearr.length;
+  public XDR() {
+    this(DEFAULT_INITIAL_CAPACITY);
   }
 
-  /** Skip some bytes by moving the cursor */
-  public void skip(int size) {
-    cursor += size;
+  public XDR(ByteBuffer buf, State state) {
+    this.buf = buf;
+    this.state = state;
   }
 
   /**
-   * Write Java primitive integer as XDR signed integer.
-   * 
-   * Definition of XDR signed integer from RFC 4506:
-   * <pre>
-   * An XDR signed integer is a 32-bit datum that encodes an integer in
-   * the range [-2147483648,2147483647].  The integer is represented in
-   * two's complement notation.  The most and least significant bytes are
-   * 0 and 3, respectively.  Integers are declared as follows:
+   * Wraps a byte array as a read-only XDR message. There's no copy involved,
+   * thus it is the client's responsibility to ensure that the byte array
+   * remains unmodified when using the XDR object.
    * 
-   *       int identifier;
-   * 
-   *            (MSB)                   (LSB)
-   *          +-------+-------+-------+-------+
-   *          |byte 0 |byte 1 |byte 2 |byte 3 |                      INTEGER
-   *          +-------+-------+-------+-------+
-   *          <------------32 bits------------>
-   * </pre>
+   * @param src
+   *          the byte array to be wrapped.
    */
-  public void writeInt(int data) {
-    append(toBytes(data));
+  public XDR(byte[] src) {
+    this(ByteBuffer.wrap(src).asReadOnlyBuffer(), State.READING);
+  }
+
+  public XDR asReadOnlyWrap() {
+    ByteBuffer b = buf.asReadOnlyBuffer();
+    if (state == State.WRITING) {
+      b.flip();
+    }
+
+    XDR n = new XDR(b, State.READING);
+    return n;
+  }
+
+  public ByteBuffer buffer() {
+    return buf.duplicate();
+  }
+
+  public int size() {
+    // TODO: This overloading intends to be compatible with the semantics of
+    // the previous version of the class. This function should be separated into
+    // two with clear semantics.
+    return state == State.READING ? buf.limit() : buf.position();
   }
 
-  /**
-   * Read an XDR signed integer and return as Java primitive integer.
-   */
   public int readInt() {
-    byte byte0 = bytearr[cursor++];
-    byte byte1 = bytearr[cursor++];
-    byte byte2 = bytearr[cursor++];
-    byte byte3 = bytearr[cursor++];
-    return (XDR.toShort(byte0) << 24) + (XDR.toShort(byte1) << 16)
-        + (XDR.toShort(byte2) << 8) + XDR.toShort(byte3);
+    Preconditions.checkState(state == State.READING);
+    return buf.getInt();
   }
 
-  /**
-   * Write Java primitive boolean as an XDR boolean.
-   * 
-   * Definition of XDR boolean from RFC 4506:
-   * <pre>
-   *    Booleans are important enough and occur frequently enough to warrant
-   *    their own explicit type in the standard.  Booleans are declared as
-   *    follows:
-   * 
-   *          bool identifier;
-   * 
-   *    This is equivalent to:
-   * 
-   *          enum { FALSE = 0, TRUE = 1 } identifier;
-   * </pre>
-   */
-  public void writeBoolean(boolean data) {
-    this.writeInt(data ? 1 : 0);
+  public void writeInt(int v) {
+    ensureFreeSpace(SIZEOF_INT);
+    buf.putInt(v);
   }
 
-  /**
-   * Read an XDR boolean and return as Java primitive boolean.
-   */
   public boolean readBoolean() {
-    return readInt() == 0 ? false : true;
+    Preconditions.checkState(state == State.READING);
+    return buf.getInt() != 0;
   }
 
-  /**
-   * Write Java primitive long to an XDR signed long.
-   * 
-   * Definition of XDR signed long from RFC 4506:
-   * <pre>
-   *    The standard also defines 64-bit (8-byte) numbers called hyper
-   *    integers and unsigned hyper integers.  Their representations are the
-   *    obvious extensions of integer and unsigned integer defined above.
-   *    They are represented in two's complement notation.The most and
-   *    least significant bytes are 0 and 7, respectively. Their
-   *    declarations:
-   * 
-   *    hyper identifier; unsigned hyper identifier;
-   * 
-   *         (MSB)                                                   (LSB)
-   *       +-------+-------+-------+-------+-------+-------+-------+-------+
-   *       |byte 0 |byte 1 |byte 2 |byte 3 |byte 4 |byte 5 |byte 6 |byte 7 |
-   *       +-------+-------+-------+-------+-------+-------+-------+-------+
-   *       <----------------------------64 bits---------------------------->
-   *                                                  HYPER INTEGER
-   *                                                  UNSIGNED HYPER INTEGER
-   * </pre>
-   */
-  public void writeLongAsHyper(long data) {
-       byte byte0 = (byte) ((data & 0xff00000000000000l) >> 56);
-    byte byte1 = (byte) ((data & 0x00ff000000000000l) >> 48);
-    byte byte2 = (byte) ((data & 0x0000ff0000000000l) >> 40);
-    byte byte3 = (byte) ((data & 0x000000ff00000000l) >> 32);
-    byte byte4 = (byte) ((data & 0x00000000ff000000l) >> 24);
-    byte byte5 = (byte) ((data & 0x0000000000ff0000l) >> 16);
-    byte byte6 = (byte) ((data & 0x000000000000ff00l) >> 8);
-    byte byte7 = (byte) ((data & 0x00000000000000ffl));
-    this.append(new byte[] { byte0, byte1, byte2, byte3, byte4, byte5, byte6, byte7 });
+  public void writeBoolean(boolean v) {
+    ensureFreeSpace(SIZEOF_INT);
+    buf.putInt(v ? 1 : 0);
   }
 
-  /**
-   * Read XDR signed hyper and return as java primitive long.
-   */
   public long readHyper() {
-    byte byte0 = bytearr[cursor++];
-    byte byte1 = bytearr[cursor++];
-    byte byte2 = bytearr[cursor++];
-    byte byte3 = bytearr[cursor++];
-    byte byte4 = bytearr[cursor++];
-    byte byte5 = bytearr[cursor++];
-    byte byte6 = bytearr[cursor++];
-    byte byte7 = bytearr[cursor++];
-    return ((long) XDR.toShort(byte0) << 56)
-        + ((long) XDR.toShort(byte1) << 48) + ((long) XDR.toShort(byte2) << 40)
-        + ((long) XDR.toShort(byte3) << 32) + ((long) XDR.toShort(byte4) << 24)
-        + ((long) XDR.toShort(byte5) << 16) + ((long) XDR.toShort(byte6) << 8)
-        + XDR.toShort(byte7);
+    Preconditions.checkState(state == State.READING);
+    return buf.getLong();
   }
 
-  /**
-   * Write a Java primitive byte array to XDR fixed-length opaque data.
-   * 
-   * Defintion of fixed-length opaque data from RFC 4506:
-   * <pre>
-   *    At times, fixed-length uninterpreted data needs to be passed among
-   *    machines.  This data is called "opaque" and is declared as follows:
-   * 
-   *          opaque identifier[n];
-   * 
-   *    where the constant n is the (static) number of bytes necessary to
-   *    contain the opaque data.  If n is not a multiple of four, then the n
-   *    bytes are followed by enough (0 to 3) residual zero bytes, r, to make
-   *    the total byte count of the opaque object a multiple of four.
-   * 
-   *           0        1     ...
-   *       +--------+--------+...+--------+--------+...+--------+
-   *       | byte 0 | byte 1 |...|byte n-1|    0   |...|    0   |
-   *       +--------+--------+...+--------+--------+...+--------+
-   *       |<-----------n bytes---------->|<------r bytes------>|
-   *       |<-----------n+r (where (n+r) mod 4 = 0)------------>|
-   *                                                    FIXED-LENGTH OPAQUE
-   * </pre>
-   */
-  public void writeFixedOpaque(byte[] data) {
-    writeFixedOpaque(data, data.length);
-  }
-
-  public void writeFixedOpaque(byte[] data, int length) {
-    append(Arrays.copyOf(data, length + XDR.pad(length, 4)));
+  public void writeLongAsHyper(long v) {
+    ensureFreeSpace(SIZEOF_LONG);
+    buf.putLong(v);
   }
 
   public byte[] readFixedOpaque(int size) {
-    byte[] ret = new byte[size];
-    for(int i = 0; i < size; i++) {
-      ret[i] = bytearr[cursor];
-      cursor++;
-    }
+    Preconditions.checkState(state == State.READING);
+    byte[] r = new byte[size];
+    buf.get(r);
+    alignPosition();
+    return r;
+  }
 
-    for(int i = 0; i < XDR.pad(size, 4); i++) {
-      cursor++;
-    }
-    return ret;
+  public void writeFixedOpaque(byte[] src, int length) {
+    ensureFreeSpace(alignUp(length));
+    buf.put(src, 0, length);
+    writePadding();
   }
 
-  /**
-   * Write a Java primitive byte array as XDR variable-length opque data.
-   * 
-   * Definition of XDR variable-length opaque data RFC 4506:
-   * 
-   * <pre>
-   *    The standard also provides for variable-length (counted) opaque data,
-   *    defined as a sequence of n (numbered 0 through n-1) arbitrary bytes
-   *    to be the number n encoded as an unsigned integer (as described
-   *    below), and followed by the n bytes of the sequence.
-   * 
-   *    Byte m of the sequence always precedes byte m+1 of the sequence, and
-   *    byte 0 of the sequence always follows the sequence's length (count).
-   *    If n is not a multiple of four, then the n bytes are followed by
-   *    enough (0 to 3) residual zero bytes, r, to make the total byte count
-   *    a multiple of four.  Variable-length opaque data is declared in the
-   *    following way:
-   * 
-   *          opaque identifier<m>;
-   *       or
-   *          opaque identifier<>;
-   * 
-   *    The constant m denotes an upper bound of the number of bytes that the
-   *    sequence may contain.  If m is not specified, as in the second
-   *    declaration, it is assumed to be (2**32) - 1, the maximum length.
-   * 
-   *    The constant m would normally be found in a protocol specification.
-   *    For example, a filing protocol may state that the maximum data
-   *    transfer size is 8192 bytes, as follows:
-   * 
-   *          opaque filedata<8192>;
-   * 
-   *             0     1     2     3     4     5   ...
-   *          +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
-   *          |        length n       |byte0|byte1|...| n-1 |  0  |...|  0  |
-   *          +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
-   *          |<-------4 bytes------->|<------n bytes------>|<---r bytes--->|
-   *                                  |<----n+r (where (n+r) mod 4 = 0)---->|
-   *                                                   VARIABLE-LENGTH OPAQUE
-   * 
-   *    It is an error to encode a length greater than the maximum described
-   *    in the specification.
-   * </pre>
-   */
-  public void writeVariableOpaque(byte[] data) {
-    this.writeInt(data.length);
-    this.writeFixedOpaque(data);
+  public void writeFixedOpaque(byte[] src) {
+    writeFixedOpaque(src, src.length);
   }
 
   public byte[] readVariableOpaque() {
-    int size = this.readInt();
-    return size != 0 ? this.readFixedOpaque(size) : null;
+    Preconditions.checkState(state == State.READING);
+    int size = readInt();
+    return readFixedOpaque(size);
   }
 
-  public void skipVariableOpaque() {
-    int length= this.readInt();
-    this.skip(length+XDR.pad(length, 4));
-  }
-  
-  /**
-   * Write Java String as XDR string.
-   * 
-   * Definition of XDR string from RFC 4506:
-   * 
-   * <pre>
-   *    The standard defines a string of n (numbered 0 through n-1) ASCII
-   *    bytes to be the number n encoded as an unsigned integer (as described
-   *    above), and followed by the n bytes of the string.  Byte m of the
-   *    string always precedes byte m+1 of the string, and byte 0 of the
-   *    string always follows the string's length.  If n is not a multiple of
-   *    four, then the n bytes are followed by enough (0 to 3) residual zero
-   *    bytes, r, to make the total byte count a multiple of four.  Counted
-   *    byte strings are declared as follows:
-   * 
-   *          string object<m>;
-   *       or
-   *          string object<>;
-   * 
-   *    The constant m denotes an upper bound of the number of bytes that a
-   *    string may contain.  If m is not specified, as in the second
-   *    declaration, it is assumed to be (2**32) - 1, the maximum length.
-   *    The constant m would normally be found in a protocol specification.
-   *    For example, a filing protocol may state that a file name can be no
-   *    longer than 255 bytes, as follows:
-   * 
-   *          string filename<255>;
-   * 
-   *             0     1     2     3     4     5   ...
-   *          +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
-   *          |        length n       |byte0|byte1|...| n-1 |  0  |...|  0  |
-   *          +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
-   *          |<-------4 bytes------->|<------n bytes------>|<---r bytes--->|
-   *                                  |<----n+r (where (n+r) mod 4 = 0)---->|
-   *                                                                   STRING
-   *    It is an error to encode a length greater than the maximum described
-   *    in the specification.
-   * </pre>
-   */
-  public void writeString(String data) {
-    this.writeVariableOpaque(data.getBytes());
+  public void writeVariableOpaque(byte[] src) {
+    ensureFreeSpace(SIZEOF_INT + alignUp(src.length));
+    buf.putInt(src.length);
+    writeFixedOpaque(src);
   }
 
   public String readString() {
-    return new String(this.readVariableOpaque());
+    return new String(readVariableOpaque());
   }
 
-  public void dump(PrintStream out) {
-    for(int i = 0; i < bytearr.length; i += 4) {
-      out.println(hex(bytearr[i]) + " " + hex(bytearr[i + 1]) + " "
-          + hex(bytearr[i + 2]) + " " + hex(bytearr[i + 3]));
-    }
+  public void writeString(String s) {
+    writeVariableOpaque(s.getBytes());
   }
 
-  @VisibleForTesting
-  public byte[] getBytes() {
-    return Arrays.copyOf(bytearr, bytearr.length);
+  private void writePadding() {
+    Preconditions.checkState(state == State.WRITING);
+    int p = pad(buf.position());
+    ensureFreeSpace(p);
+    buf.put(PADDING_BYTES, 0, p);
   }
 
-  public static byte[] append(byte[] bytes, byte[] bytesToAdd) {
-    byte[] newByteArray = new byte[bytes.length + bytesToAdd.length];
-    System.arraycopy(bytes, 0, newByteArray, 0, bytes.length);
-    System.arraycopy(bytesToAdd, 0, newByteArray, bytes.length, bytesToAdd.length);
-    return newByteArray;
+  private int alignUp(int length) {
+    return length + pad(length);
   }
 
-  private static int pad(int x, int y) {
-    return x % y == 0 ? 0 : y - (x % y);
+  private int pad(int length) {
+    switch (length % 4) {
+    case 1:
+      return 3;
+    case 2:
+      return 2;
+    case 3:
+      return 1;
+    default:
+      return 0;
+    }
   }
 
-  static byte[] toBytes(int n) {
-    byte[] ret = { (byte) ((n & 0xff000000) >> 24),
-        (byte) ((n & 0x00ff0000) >> 16), (byte) ((n & 0x0000ff00) >> 8),
-        (byte) (n & 0x000000ff) };
-    return ret;
+  private void alignPosition() {
+    buf.position(alignUp(buf.position()));
   }
 
-  private static short toShort(byte b) {
-    return b < 0 ? (short) (b + 256): (short) b;
+  private void ensureFreeSpace(int size) {
+    Preconditions.checkState(state == State.WRITING);
+    if (buf.remaining() < size) {
+      int newCapacity = buf.capacity() * 2;
+      int newRemaining = buf.capacity() + buf.remaining();
+
+      while (newRemaining < size) {
+        newRemaining += newCapacity;
+        newCapacity *= 2;
+      }
+
+      ByteBuffer newbuf = ByteBuffer.allocate(newCapacity);
+      buf.flip();
+      newbuf.put(buf);
+      buf = newbuf;
+    }
   }
 
-  private static String hex(byte b) {
-    return "" + HEXES.charAt((b & 0xF0) >> 4) + HEXES.charAt((b & 0x0F));
+  /** check if the rest of data has more than len bytes */
+  public static boolean verifyLength(XDR xdr, int len) {
+    return xdr.buf.remaining() >= len;
   }
 
-  private static byte[] recordMark(int size, boolean last) {
-    return toBytes(!last ? size : size | 0x80000000);
+  static byte[] recordMark(int size, boolean last) {
+    byte[] b = new byte[SIZEOF_INT];
+    ByteBuffer buf = ByteBuffer.wrap(b);
+    buf.putInt(!last ? size : size | 0x80000000);
+    return b;
   }
 
-  public static byte[] getVariableOpque(byte[] data) {
-    byte[] bytes = toBytes(data.length);
-    return append(bytes, Arrays.copyOf(data, data.length + XDR.pad(data.length, 4)));
+  /** Write an XDR message to a TCP ChannelBuffer */
+  public static ChannelBuffer writeMessageTcp(XDR request, boolean last) {
+    Preconditions.checkState(request.state == XDR.State.WRITING);
+    ByteBuffer b = request.buf.duplicate();
+    b.flip();
+    byte[] fragmentHeader = XDR.recordMark(b.limit(), last);
+    ByteBuffer headerBuf = ByteBuffer.wrap(fragmentHeader);
+
+    // TODO: Investigate whether making a copy of the buffer is necessary.
+    return ChannelBuffers.copiedBuffer(headerBuf, b);
+  }
+
+  /** Write an XDR message to a UDP ChannelBuffer */
+  public static ChannelBuffer writeMessageUdp(XDR response) {
+    Preconditions.checkState(response.state == XDR.State.READING);
+    // TODO: Investigate whether making a copy of the buffer is necessary.
+    return ChannelBuffers.copiedBuffer(response.buf);
   }
 
   public static int fragmentSize(byte[] mark) {
-    int n = (XDR.toShort(mark[0]) << 24) + (XDR.toShort(mark[1]) << 16)
-        + (XDR.toShort(mark[2]) << 8) + XDR.toShort(mark[3]);
+    ByteBuffer b = ByteBuffer.wrap(mark);
+    int n = b.getInt();
     return n & 0x7fffffff;
   }
 
   public static boolean isLastFragment(byte[] mark) {
-    int n = (XDR.toShort(mark[0]) << 24) + (XDR.toShort(mark[1]) << 16)
-        + (XDR.toShort(mark[2]) << 8) + XDR.toShort(mark[3]);
+    ByteBuffer b = ByteBuffer.wrap(mark);
+    int n = b.getInt();
     return (n & 0x80000000) != 0;
   }
 
-  /** check if the rest of data has more than <len> bytes */
-  public static boolean verifyLength(XDR xdr, int len) {
-    return (xdr.bytearr.length - xdr.cursor) >= len;
-  }
-
-  /** Write an XDR message to a TCP ChannelBuffer */
-  public static ChannelBuffer writeMessageTcp(XDR request, boolean last) {
-    byte[] fragmentHeader = XDR.recordMark(request.bytearr.length, last);
-    ChannelBuffer outBuf = ChannelBuffers.buffer(fragmentHeader.length
-        + request.bytearr.length);
-    outBuf.writeBytes(fragmentHeader);
-    outBuf.writeBytes(request.bytearr);
-    return outBuf;
-  }
+  @VisibleForTesting
+  public byte[] getBytes() {
+    ByteBuffer d = asReadOnlyWrap().buffer();
+    byte[] b = new byte[d.remaining()];
+    d.get(b);
 
-  /** Write an XDR message to a UDP ChannelBuffer */
-  public static ChannelBuffer writeMessageUdp(XDR response) {
-    ChannelBuffer outBuf = ChannelBuffers.buffer(response.bytearr.length);
-    outBuf.writeBytes(response.bytearr);
-    return outBuf;
+    return b;
   }
-}
+}
\ No newline at end of file

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java Wed Oct 30 22:21:59 2013
@@ -17,10 +17,11 @@
  */
 package org.apache.hadoop.portmap;
 
-import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
 import org.apache.hadoop.oncrpc.RpcCall;
 import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.CredentialsNone;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.apache.hadoop.portmap.PortmapInterface.Procedure;
 
 /**
@@ -33,14 +34,12 @@ public class PortmapRequest {
 
   public static XDR create(PortmapMapping mapping) {
     XDR request = new XDR();
-    RpcCall.write(request,
+    RpcCall call = RpcCall.getInstance(
         RpcUtil.getNewXid(String.valueOf(RpcProgramPortmap.PROGRAM)),
         RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION,
-        Procedure.PMAPPROC_SET.getValue());
-    request.writeInt(AuthFlavor.AUTH_NONE.getValue());
-    request.writeInt(0);
-    request.writeInt(0);
-    request.writeInt(0);
+        Procedure.PMAPPROC_SET.getValue(), new CredentialsNone(),
+        new VerifierNone());
+    call.write(request);
     return mapping.serialize(request);
   }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapResponse.java Wed Oct 30 22:21:59 2013
@@ -22,30 +22,31 @@ import java.util.Collection;
 
 import org.apache.hadoop.oncrpc.RpcAcceptedReply;
 import org.apache.hadoop.oncrpc.XDR;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
 
 /**
  * Helper utility for sending portmap response.
  */
 public class PortmapResponse {
   public static XDR voidReply(XDR xdr, int xid) {
-    RpcAcceptedReply.voidReply(xdr, xid);
+    RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(xdr);
     return xdr;
   }
 
   public static XDR intReply(XDR xdr, int xid, int value) {
-    RpcAcceptedReply.voidReply(xdr, xid);
+    RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(xdr);
     xdr.writeInt(value);
     return xdr;
   }
 
   public static XDR booleanReply(XDR xdr, int xid, boolean value) {
-    RpcAcceptedReply.voidReply(xdr, xid);
+    RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(xdr);
     xdr.writeBoolean(value);
     return xdr;
   }
 
   public static XDR pmapList(XDR xdr, int xid, Collection<PortmapMapping> list) {
-    RpcAcceptedReply.voidReply(xdr, xid);
+    RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(xdr);
     for (PortmapMapping mapping : list) {
       System.out.println(mapping);
       xdr.writeBoolean(true); // Value follows

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java Wed Oct 30 22:21:59 2013
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.portmap;
 
-import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -26,9 +25,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.oncrpc.RpcAcceptedReply;
 import org.apache.hadoop.oncrpc.RpcCall;
+import org.apache.hadoop.oncrpc.RpcInfo;
 import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.RpcResponse;
+import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.XDR;
-import org.jboss.netty.channel.Channel;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandlerContext;
 
 /**
  * An rpcbind request handler.
@@ -43,7 +48,7 @@ public class RpcProgramPortmap extends R
   private final HashMap<String, PortmapMapping> map;
 
   public RpcProgramPortmap() {
-    super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION, 0);
+    super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION);
     map = new HashMap<String, PortmapMapping>(256);
   }
 
@@ -129,10 +134,15 @@ public class RpcProgramPortmap extends R
   }
 
   @Override
-  public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
-      InetAddress client, Channel channel) {
+  public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+    RpcCall rpcCall = (RpcCall) info.header();
     final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure());
     int xid = rpcCall.getXid();
+    byte[] data = new byte[info.data().readableBytes()];
+    info.data().readBytes(data);
+    XDR in = new XDR(data);
+    XDR out = new XDR();
+
     if (portmapProc == Procedure.PMAPPROC_NULL) {
       out = nullOp(xid, in, out);
     } else if (portmapProc == Procedure.PMAPPROC_SET) {
@@ -147,10 +157,14 @@ public class RpcProgramPortmap extends R
       out = getport(xid, in, out);
     } else {
       LOG.info("PortmapHandler unknown rpc procedure=" + portmapProc);
-      RpcAcceptedReply.voidReply(out, xid,
-          RpcAcceptedReply.AcceptState.PROC_UNAVAIL);
+      RpcAcceptedReply reply = RpcAcceptedReply.getInstance(xid,
+          RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone());
+      reply.write(out);
     }
-    return out;
+
+    ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+    RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
+    RpcUtil.sendRpcResponse(ctx, rsp);
   }
   
   @Override

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/nfs/TestNfsTime.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/nfs/TestNfsTime.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/nfs/TestNfsTime.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/nfs/TestNfsTime.java Wed Oct 30 22:21:59 2013
@@ -39,7 +39,7 @@ public class TestNfsTime {
     t1.serialize(xdr);
     
     // Deserialize it back
-    NfsTime t2 = NfsTime.deserialize(xdr);
+    NfsTime t2 = NfsTime.deserialize(xdr.asReadOnlyWrap());
     
     // Ensure the NfsTimes are equal
     Assert.assertEquals(t1, t2);

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/nfs/nfs3/TestFileHandle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/nfs/nfs3/TestFileHandle.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/nfs/nfs3/TestFileHandle.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/nfs/nfs3/TestFileHandle.java Wed Oct 30 22:21:59 2013
@@ -33,7 +33,7 @@ public class TestFileHandle {
 
     // Deserialize it back 
     FileHandle handle2 = new FileHandle();
-    handle2.deserialize(xdr);
+    handle2.deserialize(xdr.asReadOnlyWrap());
     Assert.assertEquals(handle.getFileId(), 1024);
   }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java Wed Oct 30 22:21:59 2013
@@ -18,14 +18,18 @@
 
 package org.apache.hadoop.oncrpc;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
+import org.apache.hadoop.oncrpc.security.CredentialsNone;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.junit.Test;
@@ -34,7 +38,7 @@ import org.mockito.Mockito;
 public class TestFrameDecoder {
 
   private static int port = 12345; // some random server port
-  private static XDR result = null;
+  private static int resultSize;
 
   static void testRequest(XDR request) {
     SimpleTcpClient tcpClient = new SimpleTcpClient("localhost", port, request,
@@ -45,17 +49,20 @@ public class TestFrameDecoder {
   static class TestRpcProgram extends RpcProgram {
 
     protected TestRpcProgram(String program, String host, int port,
-        int progNumber, int lowProgVersion, int highProgVersion, int cacheSize) {
-      super(program, host, port, progNumber, lowProgVersion, highProgVersion,
-          cacheSize);
+        int progNumber, int lowProgVersion, int highProgVersion) {
+      super(program, host, port, progNumber, lowProgVersion, highProgVersion);
     }
 
     @Override
-    public XDR handleInternal(RpcCall rpcCall, XDR in, XDR out,
-        InetAddress client, Channel channel) {
-      // Get the final complete request and return a void response.
-      result = in;
-      return RpcAcceptedReply.voidReply(out, 1234);
+    protected void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+      resultSize = info.data().readableBytes();
+      RpcAcceptedReply reply = RpcAcceptedReply.getAcceptInstance(1234,
+          new VerifierNone());
+      XDR out = new XDR();
+      reply.write(out);
+      ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
+      RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
+      RpcUtil.sendRpcResponse(ctx, rsp);
     }
 
     @Override
@@ -135,42 +142,40 @@ public class TestFrameDecoder {
         buf);
     assertTrue(channelBuffer != null);
     // Complete frame should have to total size 10+10=20
-    assertTrue(channelBuffer.array().length == 20);
+    assertEquals(20, channelBuffer.readableBytes());
   }
 
   @Test
   public void testFrames() {
 
     RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram",
-        "localhost", port, 100000, 1, 2, 100);
+        "localhost", port, 100000, 1, 2);
     SimpleTcpServer tcpServer = new SimpleTcpServer(port, program, 1);
     tcpServer.run();
 
     XDR xdrOut = createGetportMount();
+    int headerSize = xdrOut.size();
     int bufsize = 2 * 1024 * 1024;
     byte[] buffer = new byte[bufsize];
     xdrOut.writeFixedOpaque(buffer);
-    int requestSize = xdrOut.size();
+    int requestSize = xdrOut.size() - headerSize;
 
     // Send the request to the server
     testRequest(xdrOut);
 
     // Verify the server got the request with right size
-    assertTrue(requestSize == result.size());
+    assertEquals(requestSize, resultSize);
   }
 
   static void createPortmapXDRheader(XDR xdr_out, int procedure) {
     // Make this a method
-    RpcCall.write(xdr_out, 0, 100000, 2, procedure);
+    RpcCall.getInstance(0, 100000, 2, procedure, new CredentialsNone(),
+        new VerifierNone()).write(xdr_out);
   }
 
   static XDR createGetportMount() {
     XDR xdr_out = new XDR();
     createPortmapXDRheader(xdr_out, 3);
-    xdr_out.writeInt(0); // AUTH_NULL
-    xdr_out.writeInt(0); // cred len
-    xdr_out.writeInt(0); // verifier AUTH_NULL
-    xdr_out.writeInt(0); // verf len
     return xdr_out;
   }
   /*
@@ -191,4 +196,4 @@ public class TestFrameDecoder {
    * static void testDump() { XDR xdr_out = new XDR();
    * createPortmapXDRheader(xdr_out, 4); testRequest(xdr_out); }
    */
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcAcceptedReply.java Wed Oct 30 22:21:59 2013
@@ -20,8 +20,9 @@ package org.apache.hadoop.oncrpc;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.oncrpc.RpcAcceptedReply.AcceptState;
-import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
 import org.apache.hadoop.oncrpc.RpcReply.ReplyState;
+import org.apache.hadoop.oncrpc.security.Verifier;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.junit.Test;
 
 /**
@@ -45,8 +46,8 @@ public class TestRpcAcceptedReply {
   
   @Test
   public void testConstructor() {
-    RpcAuthInfo verifier = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]);
-    RpcAcceptedReply reply = new RpcAcceptedReply(0, RpcMessage.Type.RPC_REPLY,
+    Verifier verifier = new VerifierNone();
+    RpcAcceptedReply reply = new RpcAcceptedReply(0, 
         ReplyState.MSG_ACCEPTED, verifier, AcceptState.SUCCESS);
     assertEquals(0, reply.getXid());
     assertEquals(RpcMessage.Type.RPC_REPLY, reply.getMessageType());

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCall.java Wed Oct 30 22:21:59 2013
@@ -17,8 +17,12 @@
  */
 package org.apache.hadoop.oncrpc;
 
-import org.apache.hadoop.oncrpc.RpcAuthInfo.AuthFlavor;
 import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.oncrpc.security.CredentialsNone;
+import org.apache.hadoop.oncrpc.security.Credentials;
+import org.apache.hadoop.oncrpc.security.Verifier;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.junit.Test;
 
 /**
@@ -28,8 +32,8 @@ public class TestRpcCall {
   
   @Test
   public void testConstructor() {
-    RpcAuthInfo credential = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]);
-    RpcAuthInfo verifier = new RpcAuthInfo(AuthFlavor.AUTH_NONE, new byte[0]);
+    Credentials credential = new CredentialsNone();
+    Verifier verifier = new VerifierNone();
     int rpcVersion = RpcCall.RPC_VERSION;
     int program = 2;
     int version = 3;

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcCallCache.java Wed Oct 30 22:21:59 2013
@@ -32,6 +32,8 @@ import org.apache.hadoop.oncrpc.RpcCallC
 import org.apache.hadoop.oncrpc.RpcCallCache.ClientRequest;
 import org.junit.Test;
 
+import static org.mockito.Mockito.*;
+
 /**
  * Unit tests for {@link RpcCallCache}
  */
@@ -67,7 +69,7 @@ public class TestRpcCallCache {
     validateInprogressCacheEntry(e);
     
     // Set call as completed
-    XDR response = new XDR();
+    RpcResponse response = mock(RpcResponse.class);
     cache.callCompleted(clientIp, xid, response);
     e = cache.checkOrAddToCache(clientIp, xid);
     validateCompletedCacheEntry(e, response);
@@ -79,7 +81,7 @@ public class TestRpcCallCache {
     assertNull(c.getResponse());
   }
   
-  private void validateCompletedCacheEntry(CacheEntry c, XDR response) {
+  private void validateCompletedCacheEntry(CacheEntry c, RpcResponse response) {
     assertFalse(c.isInProgress());
     assertTrue(c.isCompleted());
     assertEquals(response, c.getResponse());
@@ -93,7 +95,7 @@ public class TestRpcCallCache {
     assertFalse(c.isCompleted());
     assertNull(c.getResponse());
     
-    XDR response = new XDR();
+    RpcResponse response = mock(RpcResponse.class);
     c.setResponse(response);
     validateCompletedCacheEntry(c, response);
   }

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcDeniedReply.java Wed Oct 30 22:21:59 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.oncrpc;
 
 import org.apache.hadoop.oncrpc.RpcDeniedReply.RejectState;
 import org.apache.hadoop.oncrpc.RpcReply.ReplyState;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -39,10 +40,8 @@ public class TestRpcDeniedReply {
   
   @Test
   public void testConstructor() {
-    RpcDeniedReply reply = new RpcDeniedReply(0, RpcMessage.Type.RPC_REPLY,
-        ReplyState.MSG_ACCEPTED, RejectState.AUTH_ERROR) {
-      // Anonymous class
-    };
+    RpcDeniedReply reply = new RpcDeniedReply(0, ReplyState.MSG_ACCEPTED,
+        RejectState.AUTH_ERROR, new VerifierNone());
     Assert.assertEquals(0, reply.getXid());
     Assert.assertEquals(RpcMessage.Type.RPC_REPLY, reply.getMessageType());
     Assert.assertEquals(ReplyState.MSG_ACCEPTED, reply.getState());

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcMessage.java Wed Oct 30 22:21:59 2013
@@ -26,7 +26,10 @@ import org.junit.Test;
 public class TestRpcMessage {
   private RpcMessage getRpcMessage(int xid, RpcMessage.Type msgType) {
     return new RpcMessage(xid, msgType) {
-      // Anonymous class
+      @Override
+      public XDR write(XDR xdr) {
+        return null;
+      }
     };
   }
   

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestRpcReply.java Wed Oct 30 22:21:59 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.oncrpc;
 
 
 import org.apache.hadoop.oncrpc.RpcReply.ReplyState;
+import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -39,8 +40,12 @@ public class TestRpcReply {
   
   @Test
   public void testRpcReply() {
-    RpcReply reply = new RpcReply(0, RpcMessage.Type.RPC_REPLY, ReplyState.MSG_ACCEPTED) {
-      // Anonymous class
+    RpcReply reply = new RpcReply(0, ReplyState.MSG_ACCEPTED,
+        new VerifierNone()) {
+          @Override
+          public XDR write(XDR xdr) {
+            return null;
+          }
     };
     Assert.assertEquals(0, reply.getXid());
     Assert.assertEquals(RpcMessage.Type.RPC_REPLY, reply.getMessageType());

Modified: hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java Wed Oct 30 22:21:59 2013
@@ -17,23 +17,34 @@
  */
 package org.apache.hadoop.oncrpc;
 
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-
+import org.junit.Assert;
 import org.junit.Test;
 
-/**
- * Tests for {@link XDR}
- */
 public class TestXDR {
-  /**
-   * Test {@link XDR#append(byte[], byte[])}
-   */
+  private void serializeInt(int times) {
+    XDR w = new XDR();
+    for (int i = 0; i < times; ++i)
+      w.writeInt(23);
+
+    XDR r = w.asReadOnlyWrap();
+    for (int i = 0; i < times; ++i)
+      Assert.assertEquals(r.readInt(), 23);
+  }
+
+  private void serializeLong(int times) {
+    XDR w = new XDR();
+    for (int i = 0; i < times; ++i)
+      w.writeLongAsHyper(23);
+
+    XDR r = w.asReadOnlyWrap();
+    for (int i = 0; i < times; ++i)
+      Assert.assertEquals(r.readHyper(), 23);
+  }
+
   @Test
-  public void testAppendBytes() {
-    byte[] arr1 = new byte[] {0, 1};
-    byte[] arr2 = new byte[] {2, 3};
-    assertTrue(Arrays.equals(new byte[]{0, 1, 2, 3}, XDR.append(arr1, arr2)));
+  public void testPerformance() {
+    final int TEST_TIMES = 8 << 20;
+    serializeInt(TEST_TIMES);
+    serializeLong(TEST_TIMES);
   }
 }



Mime
View raw message