hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1570704 - in /hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/
Date Fri, 21 Feb 2014 20:54:53 GMT
Author: arp
Date: Fri Feb 21 20:54:53 2014
New Revision: 1570704

URL: http://svn.apache.org/r1570704
Log:
HADOOP-10278. Merging r1570703 from trunk to branch-2

Added:
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
      - copied unchanged from r1570703, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
      - copied unchanged from r1570703, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
Modified:
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1570704&r1=1570703&r2=1570704&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt Fri Feb
21 20:54:53 2014
@@ -8,6 +8,9 @@ Release 2.5.0 - UNRELEASED
 
   IMPROVEMENTS
 
+    HADOOP-10278. Refactor to make CallQueue pluggable. (Chris Li via
+    Arpit Agarwal)
+
   OPTIMIZATIONS
 
   BUG FIXES 

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java?rev=1570704&r1=1570703&r2=1570704&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
Fri Feb 21 20:54:53 2014
@@ -82,6 +82,14 @@ public class CommonConfigurationKeys ext
   /** Default value for IPC_SERVER_HANDLER_QUEUE_SIZE_KEY */
   public static final int     IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT = 100;
 
+  /**
+   * CallQueue related settings. These are not used directly, but rather
+   * combined with a namespace and port. For instance:
+   * IPC_CALLQUEUE_NAMESPACE + ".8020." + IPC_CALLQUEUE_IMPL_KEY
+   */
+  public static final String IPC_CALLQUEUE_NAMESPACE = "ipc";
+  public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
+
   /** Internal buffer size for Lzo compressor/decompressors */
   public static final String  IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY =
     "io.compression.codec.lzo.buffersize";

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1570704&r1=1570703&r2=1570704&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
Fri Feb 21 20:54:53 2014
@@ -363,7 +363,7 @@ public abstract class Server {
   private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
 
   volatile private boolean running = true;         // true while server runs
-  private BlockingQueue<Call> callQueue; // queued calls
+  private CallQueueManager<Call> callQueue;
 
   // maintains the set of client connections and handles idle timeouts
   private ConnectionManager connectionManager;
@@ -467,6 +467,19 @@ public abstract class Server {
     return serviceAuthorizationManager;
   }
 
+  /*
+   * Refresh the call queue
+   */
+  public synchronized void refreshCallQueue(Configuration conf) {
+    // Create the next queue
+    String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." +
+      this.port;
+    Class queueClassToUse = conf.getClass(prefix + "." +
+      CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class);
+
+    callQueue.swapQueue(queueClassToUse, maxQueueSize, prefix, conf);
+  }
+
   /** A call queued for handling. */
   public static class Call {
     private final int callId;             // the client's call id
@@ -2108,7 +2121,15 @@ public abstract class Server {
     this.readerPendingConnectionQueue = conf.getInt(
         CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
         CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
-    this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
+
+    // Setup appropriate callqueue
+    String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." +
+        this.port;
+    Class queueClassToUse = conf.getClass(prefix + "." +
+        CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class);
+    this.callQueue = new CallQueueManager<Call>(queueClassToUse, maxQueueSize,
+      prefix, conf);
+
     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
     this.authorize = 
       conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, 



Mime
View raw message