hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brando...@apache.org
Subject svn commit: r1545756 - in /hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src: main/java/org/apache/hadoop/portmap/ test/java/org/apache/hadoop/portmap/
Date Tue, 26 Nov 2013 18:13:05 GMT
Author: brandonli
Date: Tue Nov 26 18:13:04 2013
New Revision: 1545756

URL: http://svn.apache.org/r1545756
Log:
HDFS-5548. Use ConcurrentHashMap in portmap. Contributed by Haohui Mai

Removed:
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapInterface.java
Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
    hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java?rev=1545756&r1=1545755&r2=1545756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java
Tue Nov 26 18:13:04 2013
@@ -22,7 +22,6 @@ import org.apache.hadoop.oncrpc.RpcUtil;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.CredentialsNone;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
-import org.apache.hadoop.portmap.PortmapInterface.Procedure;
 
 /**
  * Helper utility for building portmap request
@@ -37,7 +36,7 @@ public class PortmapRequest {
     RpcCall call = RpcCall.getInstance(
         RpcUtil.getNewXid(String.valueOf(RpcProgramPortmap.PROGRAM)),
         RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION,
-        Procedure.PMAPPROC_SET.getValue(), new CredentialsNone(),
+        RpcProgramPortmap.PMAPPROC_SET, new CredentialsNone(),
         new VerifierNone());
     call.write(request);
     return mapping.serialize(request);

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java?rev=1545756&r1=1545755&r2=1545756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/RpcProgramPortmap.java
Tue Nov 26 18:13:04 2013
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.portmap;
 
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,20 +40,26 @@ import org.jboss.netty.handler.timeout.I
 import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
 import org.jboss.netty.handler.timeout.IdleStateEvent;
 
-final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler implements PortmapInterface
{
+final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler {
   static final int PROGRAM = 100000;
   static final int VERSION = 2;
+
+  static final int PMAPPROC_NULL = 0;
+  static final int PMAPPROC_SET = 1;
+  static final int PMAPPROC_UNSET = 2;
+  static final int PMAPPROC_GETPORT = 3;
+  static final int PMAPPROC_DUMP = 4;
+  static final int PMAPPROC_GETVERSADDR = 9;
+
   private static final Log LOG = LogFactory.getLog(RpcProgramPortmap.class);
 
-  /** Map synchronized usis monitor lock of this instance */
-  private final HashMap<String, PortmapMapping> map;
+  private final ConcurrentHashMap<String, PortmapMapping> map = new ConcurrentHashMap<String,
PortmapMapping>();
 
   /** ChannelGroup that remembers all active channels for gracefully shutdown. */
   private final ChannelGroup allChannels;
 
   RpcProgramPortmap(ChannelGroup allChannels) {
     this.allChannels = allChannels;
-    map = new HashMap<String, PortmapMapping>(256);
     PortmapMapping m = new PortmapMapping(PROGRAM, VERSION,
         PortmapMapping.TRANSPORT_TCP, RpcProgram.RPCB_PORT);
     PortmapMapping m1 = new PortmapMapping(PROGRAM, VERSION,
@@ -61,48 +67,66 @@ final class RpcProgramPortmap extends Id
     map.put(PortmapMapping.key(m), m);
     map.put(PortmapMapping.key(m1), m1);
   }
-  
-  @Override
-  public XDR nullOp(int xid, XDR in, XDR out) {
+
+  /**
+   * This procedure does no work. By convention, procedure zero of any protocol
+   * takes no parameters and returns no results.
+   */
+  private XDR nullOp(int xid, XDR in, XDR out) {
     return PortmapResponse.voidReply(out, xid);
   }
 
-  @Override
-  public XDR set(int xid, XDR in, XDR out) {
+  /**
+   * When a program first becomes available on a machine, it registers itself
+   * with the port mapper program on the same machine. The program passes its
+   * program number "prog", version number "vers", transport protocol number
+   * "prot", and the port "port" on which it awaits service request. The
+   * procedure returns a boolean reply whose value is "TRUE" if the procedure
+   * successfully established the mapping and "FALSE" otherwise. The procedure
+   * refuses to establish a mapping if one already exists for the tuple
+   * "(prog, vers, prot)".
+   */
+  private XDR set(int xid, XDR in, XDR out) {
     PortmapMapping mapping = PortmapRequest.mapping(in);
     String key = PortmapMapping.key(mapping);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Portmap set key=" + key);
     }
 
-    PortmapMapping value = null;
-    synchronized(this) {
-      map.put(key, mapping);
-      value = map.get(key);
-    }  
-    return PortmapResponse.intReply(out, xid, value.getPort());
+    map.put(key, mapping);
+    return PortmapResponse.intReply(out, xid, mapping.getPort());
   }
 
-  @Override
-  public synchronized XDR unset(int xid, XDR in, XDR out) {
+  /**
+   * When a program becomes unavailable, it should unregister itself with the
+   * port mapper program on the same machine. The parameters and results have
+   * meanings identical to those of "PMAPPROC_SET". The protocol and port number
+   * fields of the argument are ignored.
+   */
+  private XDR unset(int xid, XDR in, XDR out) {
     PortmapMapping mapping = PortmapRequest.mapping(in);
-    synchronized(this) {
-      map.remove(PortmapMapping.key(mapping));
-    }
+    String key = PortmapMapping.key(mapping);
+
+    if (LOG.isDebugEnabled())
+      LOG.debug("Portmap remove key=" + key);
+
+    map.remove(key);
     return PortmapResponse.booleanReply(out, xid, true);
   }
 
-  @Override
-  public synchronized XDR getport(int xid, XDR in, XDR out) {
+  /**
+   * Given a program number "prog", version number "vers", and transport
+   * protocol number "prot", this procedure returns the port number on which the
+   * program is awaiting call requests. A port value of zeros means the program
+   * has not been registered. The "port" field of the argument is ignored.
+   */
+  private XDR getport(int xid, XDR in, XDR out) {
     PortmapMapping mapping = PortmapRequest.mapping(in);
     String key = PortmapMapping.key(mapping);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Portmap GETPORT key=" + key + " " + mapping);
     }
-    PortmapMapping value = null;
-    synchronized(this) {
-      value = map.get(key);
-    }
+    PortmapMapping value = map.get(key);
     int res = 0;
     if (value != null) {
       res = value.getPort();
@@ -115,13 +139,13 @@ final class RpcProgramPortmap extends Id
     return PortmapResponse.intReply(out, xid, res);
   }
 
-  @Override
-  public synchronized XDR dump(int xid, XDR in, XDR out) {
-    PortmapMapping[] pmapList = null;
-    synchronized(this) {
-      pmapList = new PortmapMapping[map.values().size()];
-      map.values().toArray(pmapList);
-    }
+  /**
+   * This procedure enumerates all entries in the port mapper's database. The
+   * procedure takes no parameters and returns a list of program, version,
+   * protocol, and port values.
+   */
+  private XDR dump(int xid, XDR in, XDR out) {
+    PortmapMapping[] pmapList = map.values().toArray(new PortmapMapping[0]);
     return PortmapResponse.pmapList(out, xid, pmapList);
   }
 
@@ -131,23 +155,23 @@ final class RpcProgramPortmap extends Id
 
     RpcInfo info = (RpcInfo) e.getMessage();
     RpcCall rpcCall = (RpcCall) info.header();
-    final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure());
+    final int portmapProc = rpcCall.getProcedure();
     int xid = rpcCall.getXid();
     XDR in = new XDR(info.data().toByteBuffer().asReadOnlyBuffer(),
         XDR.State.READING);
     XDR out = new XDR();
 
-    if (portmapProc == Procedure.PMAPPROC_NULL) {
+    if (portmapProc == PMAPPROC_NULL) {
       out = nullOp(xid, in, out);
-    } else if (portmapProc == Procedure.PMAPPROC_SET) {
+    } else if (portmapProc == PMAPPROC_SET) {
       out = set(xid, in, out);
-    } else if (portmapProc == Procedure.PMAPPROC_UNSET) {
+    } else if (portmapProc == PMAPPROC_UNSET) {
       out = unset(xid, in, out);
-    } else if (portmapProc == Procedure.PMAPPROC_DUMP) {
+    } else if (portmapProc == PMAPPROC_DUMP) {
       out = dump(xid, in, out);
-    } else if (portmapProc == Procedure.PMAPPROC_GETPORT) {
+    } else if (portmapProc == PMAPPROC_GETPORT) {
       out = getport(xid, in, out);
-    } else if (portmapProc == Procedure.PMAPPROC_GETVERSADDR) {
+    } else if (portmapProc == PMAPPROC_GETVERSADDR) {
       out = getport(xid, in, out);
     } else {
       LOG.info("PortmapHandler unknown rpc procedure=" + portmapProc);
@@ -161,7 +185,7 @@ final class RpcProgramPortmap extends Id
     RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
     RpcUtil.sendRpcResponse(ctx, rsp);
   }
-  
+
   @Override
   public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
       throws Exception {

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java?rev=1545756&r1=1545755&r2=1545756&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/portmap/TestPortmap.java
Tue Nov 26 18:13:04 2013
@@ -23,7 +23,7 @@ import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.util.HashMap;
+import java.util.Map;
 
 import junit.framework.Assert;
 
@@ -80,7 +80,7 @@ public class TestPortmap {
     XDR req = new XDR();
     RpcCall.getInstance(++xid, RpcProgramPortmap.PROGRAM,
         RpcProgramPortmap.VERSION,
-        PortmapInterface.Procedure.PMAPPROC_SET.getValue(),
+        RpcProgramPortmap.PMAPPROC_SET,
         new CredentialsNone(), new VerifierNone()).write(req);
 
     PortmapMapping sent = new PortmapMapping(90000, 1,
@@ -101,7 +101,7 @@ public class TestPortmap {
     Thread.sleep(100);
     boolean found = false;
     @SuppressWarnings("unchecked")
-    HashMap<String, PortmapMapping> map = (HashMap<String, PortmapMapping>) Whitebox
+    Map<String, PortmapMapping> map = (Map<String, PortmapMapping>) Whitebox
         .getInternalState(pm.getHandler(), "map");
 
     for (PortmapMapping m : map.values()) {



Mime
View raw message