hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brando...@apache.org
Subject svn commit: r1525104 - in /hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src: main/java/org/apache/hadoop/nfs/nfs3/ main/java/org/apache/hadoop/oncrpc/ test/java/org/apache/hadoop/oncrpc/
Date Fri, 20 Sep 2013 19:03:51 GMT
Author: brandonli
Date: Fri Sep 20 19:03:51 2013
New Revision: 1525104

URL: http://svn.apache.org/r1525104
Log:
HDFS-5234 Move RpcFrameDecoder out of the public API. Contributed by Haohui Mai

Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java?rev=1525104&r1=1525103&r2=1525104&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java
Fri Sep 20 19:03:51 2013
@@ -20,8 +20,8 @@ package org.apache.hadoop.nfs.nfs3;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mount.MountdBase;
-import org.apache.hadoop.oncrpc.RpcFrameDecoder;
 import org.apache.hadoop.oncrpc.RpcProgram;
+import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.SimpleTcpServer;
 import org.apache.hadoop.oncrpc.SimpleTcpServerHandler;
 import org.apache.hadoop.portmap.PortmapMapping;
@@ -68,7 +68,8 @@ public abstract class Nfs3Base {
         return new ChannelPipelineFactory() {
           @Override
           public ChannelPipeline getPipeline() {
-            return Channels.pipeline(new RpcFrameDecoder(),
+            return Channels.pipeline(
+                RpcUtil.constructRpcFrameDecoder(),
                 new SimpleTcpServerHandler(rpcProgram));
           }
         };

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java?rev=1525104&r1=1525103&r2=1525104&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcUtil.java
Fri Sep 20 19:03:51 2013
@@ -17,13 +17,65 @@
  */
 package org.apache.hadoop.oncrpc;
 
-/**
- * The XID in RPC call. It is used for starting with new seed after each reboot.
- */
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
 public class RpcUtil {
+  /**
+   * The XID in RPC call. It is used for starting with new seed after each reboot.
+   */
   private static int xid = (int) (System.currentTimeMillis() / 1000) << 12;
 
   public static int getNewXid(String caller) {
     return xid = ++xid + caller.hashCode();
   }
+
+  public static FrameDecoder constructRpcFrameDecoder() {
+    return new RpcFrameDecoder();
+  }
+
+  static class RpcFrameDecoder extends FrameDecoder {
+    public static final Log LOG = LogFactory.getLog(RpcFrameDecoder.class);
+    private ChannelBuffer currentFrame;
+
+    @Override
+    protected Object decode(ChannelHandlerContext ctx, Channel channel,
+        ChannelBuffer buf) {
+
+      if (buf.readableBytes() < 4)
+        return null;
+
+      buf.markReaderIndex();
+
+      byte[] fragmentHeader = new byte[4];
+      buf.readBytes(fragmentHeader);
+      int length = XDR.fragmentSize(fragmentHeader);
+      boolean isLast = XDR.isLastFragment(fragmentHeader);
+
+      if (buf.readableBytes() < length) {
+        buf.resetReaderIndex();
+        return null;
+      }
+
+      ChannelBuffer newFragment = buf.readSlice(length);
+      if (currentFrame == null) {
+        currentFrame = newFragment;
+      } else {
+        currentFrame = ChannelBuffers.wrappedBuffer(currentFrame, newFragment);
+      }
+
+      if (isLast) {
+        ChannelBuffer completeFrame = currentFrame;
+        currentFrame = null;
+        return completeFrame;
+      } else {
+        return null;
+      }
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java?rev=1525104&r1=1525103&r2=1525104&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpClient.java
Fri Sep 20 19:03:51 2013
@@ -20,8 +20,6 @@ package org.apache.hadoop.oncrpc;
 import java.net.InetSocketAddress;
 import java.util.concurrent.Executors;
 
-import org.apache.hadoop.oncrpc.RpcFrameDecoder;
-import org.apache.hadoop.oncrpc.XDR;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
@@ -55,7 +53,8 @@ public class SimpleTcpClient {
     this.pipelineFactory = new ChannelPipelineFactory() {
       @Override
       public ChannelPipeline getPipeline() {
-        return Channels.pipeline(new RpcFrameDecoder(),
+        return Channels.pipeline(
+            RpcUtil.constructRpcFrameDecoder(),
             new SimpleTcpClientHandler(request));
       }
     };

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java?rev=1525104&r1=1525103&r2=1525104&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java
Fri Sep 20 19:03:51 2013
@@ -57,7 +57,8 @@ public class SimpleTcpServer {
     return new ChannelPipelineFactory() {
       @Override
       public ChannelPipeline getPipeline() {
-        return Channels.pipeline(new RpcFrameDecoder(),
+        return Channels.pipeline(
+            RpcUtil.constructRpcFrameDecoder(),
             new SimpleTcpServerHandler(rpcProgram));
       }
     };

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java?rev=1525104&r1=1525103&r2=1525104&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServerHandler.java
Fri Sep 20 19:03:51 2013
@@ -44,7 +44,7 @@ public class SimpleTcpServerHandler exte
   @Override
   public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
     ChannelBuffer buf = (ChannelBuffer) e.getMessage();
-    XDR request = new XDR(buf.array());
+    XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING);
     
     InetAddress remoteInetAddr = ((InetSocketAddress) ctx.getChannel()
         .getRemoteAddress()).getAddress();

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java?rev=1525104&r1=1525103&r2=1525104&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServerHandler.java
Fri Sep 20 19:03:51 2013
@@ -43,7 +43,7 @@ public class SimpleUdpServerHandler exte
   public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
     ChannelBuffer buf = (ChannelBuffer) e.getMessage();
 
-    XDR request = new XDR(buf.array());
+    XDR request = new XDR(buf.toByteBuffer().asReadOnlyBuffer(), XDR.State.READING);
     
     InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress())
         .getAddress();

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java?rev=1525104&r1=1525103&r2=1525104&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java
Fri Sep 20 19:03:51 2013
@@ -46,7 +46,7 @@ public final class XDR {
 
   private ByteBuffer buf;
 
-  private enum State {
+  public enum State {
     READING, WRITING,
   }
 
@@ -66,7 +66,7 @@ public final class XDR {
     this(DEFAULT_INITIAL_CAPACITY);
   }
 
-  private XDR(ByteBuffer buf, State state) {
+  public XDR(ByteBuffer buf, State state) {
     this.buf = buf;
     this.state = state;
   }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java?rev=1525104&r1=1525103&r2=1525104&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java
Fri Sep 20 19:03:51 2013
@@ -18,12 +18,14 @@
 
 package org.apache.hadoop.oncrpc;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
 import org.apache.hadoop.oncrpc.security.CredentialsNone;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
 import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
@@ -138,7 +140,7 @@ public class TestFrameDecoder {
         buf);
     assertTrue(channelBuffer != null);
     // Complete frame should have to total size 10+10=20
-    assertTrue(channelBuffer.array().length == 20);
+    assertEquals(20, channelBuffer.readableBytes());
   }
 
   @Test
@@ -195,4 +197,4 @@ public class TestFrameDecoder {
    * static void testDump() { XDR xdr_out = new XDR();
    * createPortmapXDRheader(xdr_out, 4); testRequest(xdr_out); }
    */
-}
\ No newline at end of file
+}

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java?rev=1525104&r1=1525103&r2=1525104&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestXDR.java
Fri Sep 20 19:03:51 2013
@@ -17,10 +17,9 @@
  */
 package org.apache.hadoop.oncrpc;
 
+import org.junit.Assert;
 import org.junit.Test;
 
-import junit.framework.Assert;
-
 public class TestXDR {
   private void serializeInt(int times) {
     XDR w = new XDR();



Mime
View raw message