hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1552205 - in /hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common: ./ src/main/docs/ src/main/java/ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/io/retry/ src/main/java/org/apache/hadoop/ipc/ src/mai...
Date Thu, 19 Dec 2013 02:04:01 GMT
Author: arp
Date: Thu Dec 19 02:03:47 2013
New Revision: 1552205

URL: http://svn.apache.org/r1552205
Log:
Merge forward from trunk to branch HDFS-2832

Modified:
    hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt   (contents,
props changed)
    hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/docs/  
(props changed)
    hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/  
(props changed)
    hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
    hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
    hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
    hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
    hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReflectionUtils.java
    hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/core/  
(props changed)
    hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt Thu Dec
19 02:03:47 2013
@@ -280,6 +280,8 @@ Trunk (Unreleased)
     HDFS-5471. CacheAdmin -listPools fails when user lacks permissions to view
     all pools (Andrew Wang via Colin Patrick McCabe)
 
+    HADOOP-10044 Improve the javadoc of rpc code (sanjay Radia)
+
   OPTIMIZATIONS
 
     HADOOP-7761. Improve the performance of raw comparisons. (todd)
@@ -395,12 +397,16 @@ Release 2.4.0 - UNRELEASED
 
     HADOOP-10102. Update commons IO from 2.1 to 2.4 (Akira Ajisaka via stevel)
 
+    HADOOP-10168. fix javadoc of ReflectionUtils#copy. (Thejas Nair via suresh)
+
+    HADOOP-10164. Allow UGI to login with a known Subject (bobby)
+
   OPTIMIZATIONS
 
     HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
 
-   HADOOP-10047. Add a direct-buffer based apis for compression. (Gopal V
-   via acmurthy)
+    HADOOP-10047. Add a direct-buffer based apis for compression. (Gopal V
+    via acmurthy)
 
   BUG FIXES
 
@@ -465,6 +471,16 @@ Release 2.4.0 - UNRELEASED
     HADOOP-10058. TestMetricsSystemImpl#testInitFirstVerifyStopInvokedImmediately
     fails on trunk (Chen He via jeagles)
 
+    HADOOP-8753. LocalDirAllocator throws "ArithmeticException: / by zero" when
+    there is no available space on configured local dir. (Benoy Antony via hitesh)
+
+    HADOOP-10106. Incorrect thread name in RPC log messages. (Ming Ma via jing9)
+
+    HADOOP-9611 mvn-rpmbuild against google-guice > 3.0 yields missing cglib
+    dependency (Timothy St. Clair via stevel)
+
+    HADOOP-10171. TestRPC fails intermittently on jkd7 (Mit Desai via jeagles)
+
 Release 2.3.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt:r1551915
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:r1550313-1552204

Propchange: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/docs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/docs:r1535792-1536571,1536573-1552204

Propchange: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1549949-1552204

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
(original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java
Thu Dec 19 02:03:47 2013
@@ -365,6 +365,10 @@ public class LocalDirAllocator {
           totalAvailable += availableOnDisk[i];
         }
 
+        if (totalAvailable == 0){
+          throw new DiskErrorException("No space available in any of the local directories.");
+        }
+
         // Keep rolling the wheel till we get a valid path
         Random r = new java.util.Random();
         while (numDirsSearched < numDirs && returnPath == null) {

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
(original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
Thu Dec 19 02:03:47 2013
@@ -68,7 +68,14 @@ public class RetryPolicies {
    * </p>
    */
   public static final RetryPolicy RETRY_FOREVER = new RetryForever();
-  
+
+  /**
+   * <p>
+   * Keep failing over forever
+   * </p>
+   */
+  public static final RetryPolicy FAILOVER_FOREVER = new FailoverForever();
+
   /**
    * <p>
    * Keep trying a limited number of times, waiting a fixed time between attempts,
@@ -166,6 +173,14 @@ public class RetryPolicies {
       return RetryAction.RETRY;
     }
   }
+
+  static class FailoverForever implements RetryPolicy {
+    @Override
+    public RetryAction shouldRetry(Exception e, int retries, int failovers,
+        boolean isIdempotentOrAtMostOnce) throws Exception {
+      return RetryAction.FAILOVER_AND_RETRY;
+    }
+  }
   
   /**
    * Retry up to maxRetries.

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
(original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java
Thu Dec 19 02:03:47 2013
@@ -37,10 +37,24 @@ public class RpcConstants {
   
   public static final int INVALID_RETRY_COUNT = -1;
   
+ /**
+  * The Rpc-connection header is as follows 
+  * +----------------------------------+
+  * |  "hrpc" 4 bytes                  |      
+  * +----------------------------------+
+  * |  Version (1 byte)                |
+  * +----------------------------------+
+  * |  Service Class (1 byte)          |
+  * +----------------------------------+
+  * |  AuthProtocol (1 byte)           |      
+  * +----------------------------------+
+  */
+  
   /**
    * The first four bytes of Hadoop RPC connections
    */
   public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
+  public static final int HEADER_LEN_AFTER_HRPC_PART = 3; // 3 bytes that follow
   
   // 1 : Introduce ping and server does not throw away RPCs
   // 3 : Introduce the protocol into the RPC connection header

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
(original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
Thu Dec 19 02:03:47 2013
@@ -551,14 +551,14 @@ public abstract class Server {
       
       @Override
       public void run() {
-        LOG.info("Starting " + getName());
+        LOG.info("Starting " + Thread.currentThread().getName());
         try {
           doRunLoop();
         } finally {
           try {
             readSelector.close();
           } catch (IOException ioe) {
-            LOG.error("Error closing read selector in " + this.getName(), ioe);
+            LOG.error("Error closing read selector in " + Thread.currentThread().getName(),
ioe);
           }
         }
       }
@@ -589,7 +589,7 @@ public abstract class Server {
             }
           } catch (InterruptedException e) {
             if (running) {                      // unexpected -- log it
-              LOG.info(getName() + " unexpectedly interrupted", e);
+              LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
             }
           } catch (IOException ex) {
             LOG.error("Error in Reader", ex);
@@ -620,7 +620,7 @@ public abstract class Server {
 
     @Override
     public void run() {
-      LOG.info(getName() + ": starting");
+      LOG.info(Thread.currentThread().getName() + ": starting");
       SERVER.set(Server.this);
       connectionManager.startIdleScan();
       while (running) {
@@ -652,7 +652,7 @@ public abstract class Server {
           closeCurrentConnection(key, e);
         }
       }
-      LOG.info("Stopping " + this.getName());
+      LOG.info("Stopping " + Thread.currentThread().getName());
 
       synchronized (this) {
         try {
@@ -710,14 +710,14 @@ public abstract class Server {
       try {
         count = c.readAndProcess();
       } catch (InterruptedException ieo) {
-        LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
+        LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException",
ieo);
         throw ieo;
       } catch (Exception e) {
         // a WrappedRpcServerException is an exception that has been sent
         // to the client, so the stacktrace is unnecessary; any other
         // exceptions are unexpected internal server errors and thus the
         // stacktrace should be logged
-        LOG.info(getName() + ": readAndProcess from client " +
+        LOG.info(Thread.currentThread().getName() + ": readAndProcess from client " +
             c.getHostAddress() + " threw exception [" + e + "]",
             (e instanceof WrappedRpcServerException) ? null : e);
         count = -1; //so that the (count < 0) block is executed
@@ -740,7 +740,7 @@ public abstract class Server {
         try {
           acceptChannel.socket().close();
         } catch (IOException e) {
-          LOG.info(getName() + ":Exception in closing listener socket. " + e);
+          LOG.info(Thread.currentThread().getName() + ":Exception in closing listener socket.
" + e);
         }
       }
       for (Reader r : readers) {
@@ -773,16 +773,16 @@ public abstract class Server {
 
     @Override
     public void run() {
-      LOG.info(getName() + ": starting");
+      LOG.info(Thread.currentThread().getName() + ": starting");
       SERVER.set(Server.this);
       try {
         doRunLoop();
       } finally {
-        LOG.info("Stopping " + this.getName());
+        LOG.info("Stopping " + Thread.currentThread().getName());
         try {
           writeSelector.close();
         } catch (IOException ioe) {
-          LOG.error("Couldn't close write selector in " + this.getName(), ioe);
+          LOG.error("Couldn't close write selector in " + Thread.currentThread().getName(),
ioe);
         }
       }
     }
@@ -803,7 +803,7 @@ public abstract class Server {
                   doAsyncWrite(key);
               }
             } catch (IOException e) {
-              LOG.info(getName() + ": doAsyncWrite threw exception " + e);
+              LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception
" + e);
             }
           }
           long now = Time.now();
@@ -918,7 +918,7 @@ public abstract class Server {
           call = responseQueue.removeFirst();
           SocketChannel channel = call.connection.channel;
           if (LOG.isDebugEnabled()) {
-            LOG.debug(getName() + ": responding to " + call);
+            LOG.debug(Thread.currentThread().getName() + ": responding to " + call);
           }
           //
           // Send as much data as we can in the non-blocking fashion
@@ -937,7 +937,7 @@ public abstract class Server {
               done = false;            // more calls pending to be sent.
             }
             if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to " + call
+              LOG.debug(Thread.currentThread().getName() + ": responding to " + call
                   + " Wrote " + numBytes + " bytes.");
             }
           } else {
@@ -965,7 +965,7 @@ public abstract class Server {
               }
             }
             if (LOG.isDebugEnabled()) {
-              LOG.debug(getName() + ": responding to " + call
+              LOG.debug(Thread.currentThread().getName() + ": responding to " + call
                   + " Wrote partial " + numBytes + " bytes.");
             }
           }
@@ -973,7 +973,7 @@ public abstract class Server {
         }
       } finally {
         if (error && call != null) {
-          LOG.warn(getName()+", call " + call + ": output error");
+          LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");
           done = true;               // error. no more data for this channel.
           closeConnection(call.connection);
         }
@@ -1105,6 +1105,9 @@ public abstract class Server {
       this.channel = channel;
       this.lastContact = lastContact;
       this.data = null;
+      
+      // the buffer is initialized to read the "hrpc" and after that to read
+      // the length of the Rpc-packet (i.e 4 bytes)
       this.dataLengthBuffer = ByteBuffer.allocate(4);
       this.unwrappedData = null;
       this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
@@ -1200,7 +1203,16 @@ public abstract class Server {
       }
     }
 
-    private Throwable getCauseForInvalidToken(IOException e) {
+    /**
+     * Some exceptions ({@link RetriableException} and {@link StandbyException})
+     * that are wrapped as a cause of parameter e are unwrapped so that they can
+     * be sent as the true cause to the client side. In case of
+     * {@link InvalidToken} we go one level deeper to get the true cause.
+     * 
+     * @param e the exception that may have a cause we want to unwrap.
+     * @return the true cause for some exceptions.
+     */
+    private Throwable getTrueCause(IOException e) {
       Throwable cause = e;
       while (cause != null) {
         if (cause instanceof RetriableException) {
@@ -1223,6 +1235,18 @@ public abstract class Server {
       return e;
     }
     
+    /**
+     * Process saslMessage and send saslResponse back
+     * @param saslMessage received SASL message
+     * @throws WrappedRpcServerException setup failed due to SASL negotiation 
+     *         failure, premature or invalid connection context, or other state 
+     *         errors. This exception needs to be sent to the client. This 
+     *         exception will wrap {@link RetriableException}, 
+     *         {@link InvalidToken}, {@link StandbyException} or 
+     *         {@link SaslException}.
+     * @throws IOException if sending reply fails
+     * @throws InterruptedException
+     */
     private void saslProcess(RpcSaslProto saslMessage)
         throws WrappedRpcServerException, IOException, InterruptedException {
       if (saslContextEstablished) {
@@ -1239,7 +1263,7 @@ public abstract class Server {
           // attempting user could be null
           AUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":"
               + attemptingUser + " (" + e.getLocalizedMessage() + ")");
-          throw (IOException) getCauseForInvalidToken(e);
+          throw (IOException) getTrueCause(e);
         }
         
         if (saslServer != null && saslServer.isComplete()) {
@@ -1274,13 +1298,26 @@ public abstract class Server {
       }
     }
     
+    /**
+     * Process a saslMessge.
+     * @param saslMessage received SASL message
+     * @return the sasl response to send back to client
+     * @throws SaslException if authentication or generating response fails, 
+     *                       or SASL protocol mixup
+     * @throws IOException if a SaslServer cannot be created
+     * @throws AccessControlException if the requested authentication type 
+     *         is not supported or trying to re-attempt negotiation.
+     * @throws InterruptedException
+     */
     private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
-        throws IOException, InterruptedException {
+        throws SaslException, IOException, AccessControlException,
+        InterruptedException {
       RpcSaslProto saslResponse = null;
       final SaslState state = saslMessage.getState(); // required      
       switch (state) {
         case NEGOTIATE: {
           if (sentNegotiate) {
+            // FIXME shouldn't this be SaslException?
             throw new AccessControlException(
                 "Client already attempted negotiation");
           }
@@ -1402,12 +1439,30 @@ public abstract class Server {
       }
     }
 
+    /**
+     * This method reads in a non-blocking fashion from the channel: 
+     * this method is called repeatedly when data is present in the channel; 
+     * when it has enough data to process one rpc it processes that rpc.
+     * 
+     * On the first pass, it processes the connectionHeader, 
+     * connectionContext (an outOfBand RPC) and at most one RPC request that 
+     * follows that. On future passes it will process at most one RPC request.
+     *  
+     * Quirky things: dataLengthBuffer (4 bytes) is used to read "hrpc" OR 
+     * rpc request length.
+     *    
+     * @return -1 in case of error, else num bytes read so far
+     * @throws WrappedRpcServerException - an exception that has already been 
+     *         sent back to the client that does not require verbose logging
+     *         by the Listener thread
+     * @throws IOException - internal error that should not be returned to
+     *         client, typically failure to respond to client
+     * @throws InterruptedException
+     */
     public int readAndProcess()
         throws WrappedRpcServerException, IOException, InterruptedException {
       while (true) {
-        /* Read at most one RPC. If the header is not read completely yet
-         * then iterate until we read first RPC or until there is no data left.
-         */    
+        // dataLengthBuffer is used to read "hrpc" or the rpc-packet length
         int count = -1;
         if (dataLengthBuffer.remaining() > 0) {
           count = channelRead(channel, dataLengthBuffer);       
@@ -1416,9 +1471,11 @@ public abstract class Server {
         }
         
         if (!connectionHeaderRead) {
-          //Every connection is expected to send the header.
+          // Every connection is expected to send the header;
+          // so far we read "hrpc" of the connection header.
           if (connectionHeaderBuf == null) {
-            connectionHeaderBuf = ByteBuffer.allocate(3);
+            // for the bytes that follow "hrpc", in the connection header
+            connectionHeaderBuf = ByteBuffer.allocate(HEADER_LEN_AFTER_HRPC_PART);
           }
           count = channelRead(channel, connectionHeaderBuf);
           if (count < 0 || connectionHeaderBuf.remaining() > 0) {
@@ -1451,27 +1508,30 @@ public abstract class Server {
           // this may switch us into SIMPLE
           authProtocol = initializeAuthContext(connectionHeaderBuf.get(2));          
           
-          dataLengthBuffer.clear();
+          dataLengthBuffer.clear(); // clear to next read rpc packet len
           connectionHeaderBuf = null;
           connectionHeaderRead = true;
-          continue;
+          continue; // connection header read, now read  4 bytes rpc packet len
         }
         
-        if (data == null) {
+        if (data == null) { // just read 4 bytes -  length of RPC packet
           dataLengthBuffer.flip();
           dataLength = dataLengthBuffer.getInt();
           checkDataLength(dataLength);
+          // Set buffer for reading EXACTLY the RPC-packet length and no more.
           data = ByteBuffer.allocate(dataLength);
         }
-        
+        // Now read the RPC packet
         count = channelRead(channel, data);
         
         if (data.remaining() == 0) {
-          dataLengthBuffer.clear();
+          dataLengthBuffer.clear(); // to read length of future rpc packets
           data.flip();
           boolean isHeaderRead = connectionContextRead;
           processOneRpc(data.array());
           data = null;
+          // the last rpc-request we processed could have simply been the
+          // connectionContext; if so continue to read the first RPC.
           if (!isHeaderRead) {
             continue;
           }
@@ -1508,8 +1568,16 @@ public abstract class Server {
       return authProtocol;
     }
 
+    /**
+     * Process the Sasl's Negotiate request, including the optimization of 
+     * accelerating token negotiation.
+     * @return the response to Negotiate request - the list of enabled 
+     *         authMethods and challenge if the TOKENS are supported. 
+     * @throws SaslException - if attempt to generate challenge fails.
+     * @throws IOException - if it fails to create the SASL server for Tokens
+     */
     private RpcSaslProto buildSaslNegotiateResponse()
-        throws IOException, InterruptedException {
+        throws InterruptedException, SaslException, IOException {
       RpcSaslProto negotiateMessage = negotiateResponse;
       // accelerate token negotiation by sending initial challenge
       // in the negotiation response
@@ -1635,8 +1703,11 @@ public abstract class Server {
     /**
      * Process a wrapped RPC Request - unwrap the SASL packet and process
      * each embedded RPC request 
-     * @param buf - SASL wrapped request of one or more RPCs
+     * @param inBuf - SASL wrapped request of one or more RPCs
      * @throws IOException - SASL packet cannot be unwrapped
+     * @throws WrappedRpcServerException - an exception that has already been 
+     *         sent back to the client that does not require verbose logging
+     *         by the Listener thread
      * @throws InterruptedException
      */    
     private void unwrapPacketAndProcessRpcs(byte[] inBuf)
@@ -1677,13 +1748,21 @@ public abstract class Server {
     }
     
     /**
-     * Process an RPC Request - handle connection setup and decoding of
-     * request into a Call
+     * Process one RPC Request from buffer read from socket stream 
+     *  - decode rpc in a rpc-Call
+     *  - handle out-of-band RPC requests such as the initial connectionContext
+     *  - A successfully decoded RpcCall will be deposited in RPC-Q and
+     *    its response will be sent later when the request is processed.
+     * 
+     * Prior to this call the connectionHeader ("hrpc...") has been handled and
+     * if SASL then SASL has been established and the buf we are passed
+     * has been unwrapped from SASL.
+     * 
      * @param buf - contains the RPC request header and the rpc request
      * @throws IOException - internal error that should not be returned to
      *         client, typically failure to respond to client
-     * @throws WrappedRpcServerException - an exception to be sent back to
-     *         the client that does not require verbose logging by the
+     * @throws WrappedRpcServerException - an exception that is sent back to the
+     *         client in this method and does not require verbose logging by the
      *         Listener thread
      * @throws InterruptedException
      */    
@@ -1753,8 +1832,11 @@ public abstract class Server {
     }
 
     /**
-     * Process an RPC Request - the connection headers and context must
-     * have been already read
+     * Process an RPC Request 
+     *   - the connection headers and context must have been already read.
+     *   - Based on the rpcKind, decode the rpcRequest.
+     *   - A successfully decoded RpcCall will be deposited in RPC-Q and
+     *     its response will be sent later when the request is processed.
      * @param header - RPC request header
      * @param dis - stream to request payload
      * @throws WrappedRpcServerException - due to fatal rpc layer issues such
@@ -1803,7 +1885,8 @@ public abstract class Server {
      * @param dis - stream to request payload
      * @throws WrappedRpcServerException - setup failed due to SASL
      *         negotiation failure, premature or invalid connection context,
-     *         or other state errors 
+     *         or other state errors. This exception needs to be sent to the 
+     *         client.
      * @throws IOException - failed to send a response back to the client
      * @throws InterruptedException
      */
@@ -1928,7 +2011,7 @@ public abstract class Server {
 
     @Override
     public void run() {
-      LOG.debug(getName() + ": starting");
+      LOG.debug(Thread.currentThread().getName() + ": starting");
       SERVER.set(Server.this);
       ByteArrayOutputStream buf = 
         new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
@@ -1936,7 +2019,7 @@ public abstract class Server {
         try {
           final Call call = callQueue.take(); // pop the queue; maybe blocked here
           if (LOG.isDebugEnabled()) {
-            LOG.debug(getName() + ": " + call + " for RpcKind " + call.rpcKind);
+            LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " +
call.rpcKind);
           }
           String errorClass = null;
           String error = null;
@@ -1969,7 +2052,7 @@ public abstract class Server {
             if (e instanceof UndeclaredThrowableException) {
               e = e.getCause();
             }
-            String logMsg = getName() + ", call " + call + ": error: " + e;
+            String logMsg = Thread.currentThread().getName() + ", call " + call + ": error:
" + e;
             if (e instanceof RuntimeException || e instanceof Error) {
               // These exception types indicate something is probably wrong
               // on the server side, as opposed to just a normal exceptional
@@ -2018,13 +2101,13 @@ public abstract class Server {
           }
         } catch (InterruptedException e) {
           if (running) {                          // unexpected -- log it
-            LOG.info(getName() + " unexpectedly interrupted", e);
+            LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
           }
         } catch (Exception e) {
-          LOG.info(getName() + " caught an exception", e);
+          LOG.info(Thread.currentThread().getName() + " caught an exception", e);
         }
       }
-      LOG.debug(getName() + ": exiting");
+      LOG.debug(Thread.currentThread().getName() + ": exiting");
     }
 
   }

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
(original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
Thu Dec 19 02:03:47 2013
@@ -477,7 +477,7 @@ public class UserGroupInformation {
     
     private static final AppConfigurationEntry[] SIMPLE_CONF = 
       new AppConfigurationEntry[]{OS_SPECIFIC_LOGIN, HADOOP_LOGIN};
-
+    
     private static final AppConfigurationEntry[] USER_KERBEROS_CONF =
       new AppConfigurationEntry[]{OS_SPECIFIC_LOGIN, USER_KERBEROS_LOGIN,
                                   HADOOP_LOGIN};
@@ -682,44 +682,59 @@ public class UserGroupInformation {
   public synchronized 
   static UserGroupInformation getLoginUser() throws IOException {
     if (loginUser == null) {
-      ensureInitialized();
-      try {
-        Subject subject = new Subject();
-        LoginContext login =
-            newLoginContext(authenticationMethod.getLoginAppName(), 
-                            subject, new HadoopConfiguration());
-        login.login();
-        UserGroupInformation realUser = new UserGroupInformation(subject);
-        realUser.setLogin(login);
-        realUser.setAuthenticationMethod(authenticationMethod);
-        realUser = new UserGroupInformation(login.getSubject());
-        // If the HADOOP_PROXY_USER environment variable or property
-        // is specified, create a proxy user as the logged in user.
-        String proxyUser = System.getenv(HADOOP_PROXY_USER);
-        if (proxyUser == null) {
-          proxyUser = System.getProperty(HADOOP_PROXY_USER);
-        }
-        loginUser = proxyUser == null ? realUser : createProxyUser(proxyUser, realUser);
-
-        String fileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
-        if (fileLocation != null) {
-          // Load the token storage file and put all of the tokens into the
-          // user. Don't use the FileSystem API for reading since it has a lock
-          // cycle (HADOOP-9212).
-          Credentials cred = Credentials.readTokenStorageFile(
-              new File(fileLocation), conf);
-          loginUser.addCredentials(cred);
-        }
-        loginUser.spawnAutoRenewalThreadForUserCreds();
-      } catch (LoginException le) {
-        LOG.debug("failure to login", le);
-        throw new IOException("failure to login", le);
+      loginUserFromSubject(null);
+    }
+    return loginUser;
+  }
+  
+  /**
+   * Log in a user using the given subject
+   * @parma subject the subject to use when logging in a user, or null to 
+   * create a new subject.
+   * @throws IOException if login fails
+   */
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  public synchronized 
+  static void loginUserFromSubject(Subject subject) throws IOException {
+    ensureInitialized();
+    try {
+      if (subject == null) {
+        subject = new Subject();
+      }
+      LoginContext login =
+          newLoginContext(authenticationMethod.getLoginAppName(), 
+                          subject, new HadoopConfiguration());
+      login.login();
+      UserGroupInformation realUser = new UserGroupInformation(subject);
+      realUser.setLogin(login);
+      realUser.setAuthenticationMethod(authenticationMethod);
+      realUser = new UserGroupInformation(login.getSubject());
+      // If the HADOOP_PROXY_USER environment variable or property
+      // is specified, create a proxy user as the logged in user.
+      String proxyUser = System.getenv(HADOOP_PROXY_USER);
+      if (proxyUser == null) {
+        proxyUser = System.getProperty(HADOOP_PROXY_USER);
       }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("UGI loginUser:"+loginUser);
+      loginUser = proxyUser == null ? realUser : createProxyUser(proxyUser, realUser);
+
+      String fileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
+      if (fileLocation != null) {
+        // Load the token storage file and put all of the tokens into the
+        // user. Don't use the FileSystem API for reading since it has a lock
+        // cycle (HADOOP-9212).
+        Credentials cred = Credentials.readTokenStorageFile(
+            new File(fileLocation), conf);
+        loginUser.addCredentials(cred);
       }
+      loginUser.spawnAutoRenewalThreadForUserCreds();
+    } catch (LoginException le) {
+      LOG.debug("failure to login", le);
+      throw new IOException("failure to login", le);
     }
-    return loginUser;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("UGI loginUser:"+loginUser);
+    } 
   }
 
   @InterfaceAudience.Private

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReflectionUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReflectionUtils.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReflectionUtils.java
(original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ReflectionUtils.java
Thu Dec 19 02:03:47 2013
@@ -275,8 +275,9 @@ public class ReflectionUtils {
   
   /**
    * Make a copy of the writable object using serialization to a buffer
-   * @param dst the object to copy from
-   * @param src the object to copy into, which is destroyed
+   * @param src the object to copy from
+   * @param dst the object to copy into, which is destroyed
+   * @return dst param (the copy)
    * @throws IOException
    */
   @SuppressWarnings("unchecked")

Propchange: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/core/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/core:r1535792-1536571,1536573-1552204

Modified: hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
(original)
+++ hadoop/common/branches/HDFS-2832/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
Thu Dec 19 02:03:47 2013
@@ -957,6 +957,7 @@ public class TestRPC {
       proxy.sleep(pingInterval*4);
     } finally {
       if (proxy != null) RPC.stopProxy(proxy);
+      server.stop();
     }
   }
 



Mime
View raw message