hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1573065 - in /hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/
Date Fri, 28 Feb 2014 20:39:49 GMT
Author: arp
Date: Fri Feb 28 20:39:48 2014
New Revision: 1573065

URL: http://svn.apache.org/r1573065
Log:
HADOOP-10278. Merging r1570704 from branch-2 to branch-2.4.

Added:
    hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
      - copied unchanged from r1570704, hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
    hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
      - copied unchanged from r1570704, hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
Modified:
    hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
    hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Modified: hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1573065&r1=1573064&r2=1573065&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/CHANGES.txt Fri
Feb 28 20:39:48 2014
@@ -31,6 +31,9 @@ Release 2.4.0 - UNRELEASED
     HADOOP-10374. InterfaceAudience annotations should have
     RetentionPolicy.RUNTIME (Enis Soztutar via Arpit Agarwal)
 
+    HADOOP-10278. Refactor to make CallQueue pluggable. (Chris Li via
+    Arpit Agarwal)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java?rev=1573065&r1=1573064&r2=1573065&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
(original)
+++ hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
Fri Feb 28 20:39:48 2014
@@ -82,6 +82,14 @@ public class CommonConfigurationKeys ext
   /** Default value for IPC_SERVER_HANDLER_QUEUE_SIZE_KEY */
   public static final int     IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT = 100;
 
+  /**
+   * CallQueue related settings. These are not used directly, but rather
+   * combined with a namespace and port. For instance:
+   * IPC_CALLQUEUE_NAMESPACE + ".8020." + IPC_CALLQUEUE_IMPL_KEY
+   */
+  public static final String IPC_CALLQUEUE_NAMESPACE = "ipc";
+  public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
+
   /** Internal buffer size for Lzo compressor/decompressors */
   public static final String  IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY =
     "io.compression.codec.lzo.buffersize";

Modified: hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1573065&r1=1573064&r2=1573065&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
(original)
+++ hadoop/common/branches/branch-2.4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
Fri Feb 28 20:39:48 2014
@@ -363,7 +363,7 @@ public abstract class Server {
   private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
 
   volatile private boolean running = true;         // true while server runs
-  private BlockingQueue<Call> callQueue; // queued calls
+  private CallQueueManager<Call> callQueue;
 
   // maintains the set of client connections and handles idle timeouts
   private ConnectionManager connectionManager;
@@ -467,6 +467,19 @@ public abstract class Server {
     return serviceAuthorizationManager;
   }
 
+  /*
+   * Refresh the call queue
+   */
+  public synchronized void refreshCallQueue(Configuration conf) {
+    // Create the next queue
+    String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." +
+      this.port;
+    Class queueClassToUse = conf.getClass(prefix + "." +
+      CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class);
+
+    callQueue.swapQueue(queueClassToUse, maxQueueSize, prefix, conf);
+  }
+
   /** A call queued for handling. */
   public static class Call {
     private final int callId;             // the client's call id
@@ -2108,7 +2121,15 @@ public abstract class Server {
     this.readerPendingConnectionQueue = conf.getInt(
         CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
         CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
-    this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
+
+    // Setup appropriate callqueue
+    String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." +
+        this.port;
+    Class queueClassToUse = conf.getClass(prefix + "." +
+        CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class);
+    this.callQueue = new CallQueueManager<Call>(queueClassToUse, maxQueueSize,
+      prefix, conf);
+
     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
     this.authorize = 
       conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, 



Mime
View raw message