hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1577710 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/
Date Fri, 14 Mar 2014 21:12:25 GMT
Author: szetszwo
Date: Fri Mar 14 21:12:25 2014
New Revision: 1577710

URL: http://svn.apache.org/r1577710
Log:
HADOOP-10407. Fix the javac warnings in org.apache.hadoop.ipc package.

Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1577710&r1=1577709&r2=1577710&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Fri Mar 14 21:12:25
2014
@@ -423,6 +423,9 @@ Release 2.4.0 - UNRELEASED
     HADOOP-10337 ConcurrentModificationException from
     MetricsDynamicMBeanBase.createMBeanInfo() (Liang Xie via stack)
 
+    HADOOP-10407. Fix the javac warnings in org.apache.hadoop.ipc package.
+    (szetszwo)
+
   BREAKDOWN OF HADOOP-10184 SUBTASKS AND RELATED JIRAS
 
     HADOOP-10185. FileSystem API for ACLs. (cnauroth)

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java?rev=1577710&r1=1577709&r2=1577710&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
Fri Mar 14 21:12:25 2014
@@ -18,15 +18,13 @@
 
 package org.apache.hadoop.ipc;
 
+import java.lang.reflect.Constructor;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.TimeUnit;
-
-import java.lang.reflect.Constructor;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -35,13 +33,19 @@ import org.apache.hadoop.conf.Configurat
 public class CallQueueManager<E> {
   public static final Log LOG = LogFactory.getLog(CallQueueManager.class);
 
+  @SuppressWarnings("unchecked")
+  static <E> Class<? extends BlockingQueue<E>> convertQueueClass(
+      Class<?> queneClass, Class<E> elementClass) {
+    return (Class<? extends BlockingQueue<E>>)queneClass;
+  }
+  
   // Atomic refs point to active callQueue
   // We have two so we can better control swapping
   private final AtomicReference<BlockingQueue<E>> putRef;
   private final AtomicReference<BlockingQueue<E>> takeRef;
 
-  public CallQueueManager(Class backingClass, int maxQueueSize,
-    String namespace, Configuration conf) {
+  public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
+      int maxQueueSize, String namespace, Configuration conf) {
     BlockingQueue<E> bq = createCallQueueInstance(backingClass,
       maxQueueSize, namespace, conf);
     this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
@@ -49,15 +53,14 @@ public class CallQueueManager<E> {
     LOG.info("Using callQueue " + backingClass);
   }
 
-  @SuppressWarnings("unchecked")
-  private BlockingQueue<E> createCallQueueInstance(Class theClass, int maxLen,
-    String ns, Configuration conf) {
+  private <T extends BlockingQueue<E>> T createCallQueueInstance(
+      Class<T> theClass, int maxLen, String ns, Configuration conf) {
 
     // Used for custom, configurable callqueues
     try {
-      Constructor ctor = theClass.getDeclaredConstructor(int.class, String.class,
+      Constructor<T> ctor = theClass.getDeclaredConstructor(int.class, String.class,
         Configuration.class);
-      return (BlockingQueue<E>)ctor.newInstance(maxLen, ns, conf);
+      return ctor.newInstance(maxLen, ns, conf);
     } catch (RuntimeException e) {
       throw e;
     } catch (Exception e) {
@@ -65,8 +68,8 @@ public class CallQueueManager<E> {
 
     // Used for LinkedBlockingQueue, ArrayBlockingQueue, etc
     try {
-      Constructor ctor = theClass.getDeclaredConstructor(int.class);
-      return (BlockingQueue<E>)ctor.newInstance(maxLen);
+      Constructor<T> ctor = theClass.getDeclaredConstructor(int.class);
+      return ctor.newInstance(maxLen);
     } catch (RuntimeException e) {
       throw e;
     } catch (Exception e) {
@@ -74,8 +77,8 @@ public class CallQueueManager<E> {
 
     // Last attempt
     try {
-      Constructor ctor = theClass.getDeclaredConstructor();
-      return (BlockingQueue<E>)ctor.newInstance();
+      Constructor<T> ctor = theClass.getDeclaredConstructor();
+      return ctor.newInstance();
     } catch (RuntimeException e) {
       throw e;
     } catch (Exception e) {
@@ -117,8 +120,9 @@ public class CallQueueManager<E> {
    * Replaces active queue with the newly requested one and transfers
    * all calls to the newQ before returning.
    */
-  public synchronized void swapQueue(Class queueClassToUse, int maxSize,
-    String ns, Configuration conf) {
+  public synchronized void swapQueue(
+      Class<? extends BlockingQueue<E>> queueClassToUse, int maxSize,
+      String ns, Configuration conf) {
     BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse, maxSize,
       ns, conf);
 
@@ -143,7 +147,7 @@ public class CallQueueManager<E> {
    * This doesn't mean the queue might not fill up at some point later, but
    * it should decrease the probability that we lose a call this way.
    */
-  private boolean queueIsReallyEmpty(BlockingQueue q) {
+  private boolean queueIsReallyEmpty(BlockingQueue<?> q) {
     boolean wasEmpty = q.isEmpty();
     try {
       Thread.sleep(10);

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1577710&r1=1577709&r2=1577710&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
Fri Mar 14 21:12:25 2014
@@ -541,7 +541,7 @@ public class Client {
     }
     
     private synchronized AuthMethod setupSaslConnection(final InputStream in2, 
-        final OutputStream out2) throws IOException, InterruptedException {
+        final OutputStream out2) throws IOException {
       // Do not use Client.conf here! We must use ConnectionId.conf, since the
       // Client object is cached and shared between all RPC clients, even those
       // for separate services.

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1577710&r1=1577709&r2=1577710&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
Fri Mar 14 21:12:25 2014
@@ -18,6 +18,12 @@
 
 package org.apache.hadoop.ipc;
 
+import static org.apache.hadoop.ipc.RpcConstants.AUTHORIZATION_FAILED_CALL_ID;
+import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
+import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
+import static org.apache.hadoop.ipc.RpcConstants.HEADER_LEN_AFTER_HRPC_PART;
+import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -75,8 +81,6 @@ import org.apache.hadoop.fs.CommonConfig
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import static org.apache.hadoop.ipc.RpcConstants.*;
-
 import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
@@ -469,17 +473,24 @@ public abstract class Server {
     return serviceAuthorizationManager;
   }
 
+  static Class<? extends BlockingQueue<Call>> getQueueClass(
+      String prefix, Configuration conf) {
+    String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
+    Class<?> queueClass = conf.getClass(name, LinkedBlockingQueue.class);
+    return CallQueueManager.convertQueueClass(queueClass, Call.class);
+  }
+
+  private String getQueueClassPrefix() {
+    return CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + port;
+  }
+
   /*
    * Refresh the call queue
    */
   public synchronized void refreshCallQueue(Configuration conf) {
     // Create the next queue
-    String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." +
-      this.port;
-    Class queueClassToUse = conf.getClass(prefix + "." +
-      CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class);
-
-    callQueue.swapQueue(queueClassToUse, maxQueueSize, prefix, conf);
+    String prefix = getQueueClassPrefix();
+    callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
   }
 
   /** A call queued for handling. */
@@ -1239,9 +1250,9 @@ public abstract class Server {
       Throwable cause = e;
       while (cause != null) {
         if (cause instanceof RetriableException) {
-          return (RetriableException) cause;
+          return cause;
         } else if (cause instanceof StandbyException) {
-          return (StandbyException) cause;
+          return cause;
         } else if (cause instanceof InvalidToken) {
           // FIXME: hadoop method signatures are restricting the SASL
           // callbacks to only returning InvalidToken, but some services
@@ -1335,7 +1346,7 @@ public abstract class Server {
     private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
         throws SaslException, IOException, AccessControlException,
         InterruptedException {
-      RpcSaslProto saslResponse = null;
+      final RpcSaslProto saslResponse;
       final SaslState state = saslMessage.getState(); // required      
       switch (state) {
         case NEGOTIATE: {
@@ -1372,27 +1383,18 @@ public abstract class Server {
           // SIMPLE is a legit option above.  we will send no response
           if (authMethod == AuthMethod.SIMPLE) {
             switchToSimple();
+            saslResponse = null;
             break;
           }
           // sasl server for tokens may already be instantiated
           if (saslServer == null || authMethod != AuthMethod.TOKEN) {
             saslServer = createSaslServer(authMethod);
           }
-          // fallthru to process sasl token
+          saslResponse = processSaslToken(saslMessage);
+          break;
         }
         case RESPONSE: {
-          if (!saslMessage.hasToken()) {
-            throw new SaslException("Client did not send a token");
-          }
-          byte[] saslToken = saslMessage.getToken().toByteArray();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Have read input token of size " + saslToken.length
-                + " for processing by saslServer.evaluateResponse()");
-          }
-          saslToken = saslServer.evaluateResponse(saslToken);
-          saslResponse = buildSaslResponse(
-              saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE,
-              saslToken);
+          saslResponse = processSaslToken(saslMessage);
           break;
         }
         default:
@@ -1401,6 +1403,22 @@ public abstract class Server {
       return saslResponse;
     }
 
+    private RpcSaslProto processSaslToken(RpcSaslProto saslMessage)
+        throws SaslException {
+      if (!saslMessage.hasToken()) {
+        throw new SaslException("Client did not send a token");
+      }
+      byte[] saslToken = saslMessage.getToken().toByteArray();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Have read input token of size " + saslToken.length
+            + " for processing by saslServer.evaluateResponse()");
+      }
+      saslToken = saslServer.evaluateResponse(saslToken);
+      return buildSaslResponse(
+          saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE,
+          saslToken);
+    }
+
     private void switchToSimple() {
       // disable SASL and blank out any SASL server
       authProtocol = AuthProtocol.NONE;
@@ -2208,12 +2226,9 @@ public abstract class Server {
         CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
 
     // Setup appropriate callqueue
-    String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." +
-        this.port;
-    Class queueClassToUse = conf.getClass(prefix + "." +
-        CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class);
-    this.callQueue = new CallQueueManager<Call>(queueClassToUse, maxQueueSize,
-      prefix, conf);
+    final String prefix = getQueueClassPrefix();
+    this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
+        maxQueueSize, prefix, conf);
 
     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
     this.authorize = 

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java?rev=1577710&r1=1577709&r2=1577710&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
Fri Mar 14 21:12:25 2014
@@ -18,22 +18,15 @@
 
 package org.apache.hadoop.ipc;
 
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.HashMap;
-import java.util.ArrayList;
-
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
-import org.junit.Assert;
-import org.junit.Assume;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 import org.junit.Test;
-import org.junit.Before;
-import org.junit.After;
 
 public class TestCallQueueManager {
   private CallQueueManager<FakeCall> manager;
@@ -146,23 +139,26 @@ public class TestCallQueueManager {
   }
 
 
+  private static final Class<? extends BlockingQueue<FakeCall>> queueClass
+      = CallQueueManager.convertQueueClass(LinkedBlockingQueue.class, FakeCall.class);
+
   @Test
   public void testCallQueueCapacity() throws InterruptedException {
-    manager = new CallQueueManager<FakeCall>(LinkedBlockingQueue.class, 10, "", null);
+    manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null);
 
     assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity
   }
 
   @Test
   public void testEmptyConsume() throws InterruptedException {
-    manager = new CallQueueManager<FakeCall>(LinkedBlockingQueue.class, 10, "", null);
+    manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null);
 
     assertCanTake(manager, 0, 1); // Fails since it's empty
   }
 
   @Test(timeout=60000)
   public void testSwapUnderContention() throws InterruptedException {
-    manager = new CallQueueManager<FakeCall>(LinkedBlockingQueue.class, 5000, "", null);
+    manager = new CallQueueManager<FakeCall>(queueClass, 5000, "", null);
 
     ArrayList<Putter> producers = new ArrayList<Putter>();
     ArrayList<Taker> consumers = new ArrayList<Taker>();
@@ -191,7 +187,7 @@ public class TestCallQueueManager {
     Thread.sleep(10);
 
     for (int i=0; i < 5; i++) {
-      manager.swapQueue(LinkedBlockingQueue.class, 5000, "", null);
+      manager.swapQueue(queueClass, 5000, "", null);
     }
 
     // Stop the producers



Mime
View raw message