hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1419193 - in /hadoop/common/branches/HDFS-2802/hadoop-common-project: hadoop-auth/ hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/ hadoop-auth/src/site/apt/ hadoop-auth/src/test/java/org/apache/hadoop/security/a...
Date Mon, 10 Dec 2012 03:37:41 GMT
Author: szetszwo
Date: Mon Dec 10 03:37:32 2012
New Revision: 1419193

URL: http://svn.apache.org/viewvc?rev=1419193&view=rev
Log:
Merge r1415804 through r1419190 from trunk.

Added:
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/AltKerberosAuthenticationHandler.java
      - copied unchanged from r1419190, hadoop/common/trunk/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/AltKerberosAuthenticationHandler.java
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/server/TestAltKerberosAuthenticationHandler.java
      - copied unchanged from r1419190, hadoop/common/trunk/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/server/TestAltKerberosAuthenticationHandler.java
Modified:
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/pom.xml
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/site/apt/Configuration.apt.vm
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/site/apt/index.apt.vm
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/server/TestKerberosAuthenticationHandler.java
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/docs/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/UTF8.java
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsSystem.java
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/core/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextTestHelper.java
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestHelper.java
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestUTF8.java
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java
    hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/pom.xml?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/pom.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/pom.xml Mon Dec 10 03:37:32 2012
@@ -110,6 +110,7 @@
             <exclude>**/${test.exclude}.java</exclude>
             <exclude>${test.exclude.pattern}</exclude>
             <exclude>**/TestKerberosAuth*.java</exclude>
+            <exclude>**/TestAltKerberosAuth*.java</exclude>
             <exclude>**/Test*$*.java</exclude>
           </excludes>
         </configuration>

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/site/apt/Configuration.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/site/apt/Configuration.apt.vm?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/site/apt/Configuration.apt.vm (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/site/apt/Configuration.apt.vm Mon Dec 10 03:37:32 2012
@@ -178,4 +178,71 @@ Configuration
 </web-app>
 +---+
 
+** AltKerberos Configuration
+
+  <<IMPORTANT>>: A KDC must be configured and running.
+
+  The AltKerberos authentication mechanism is a partially implemented derivative
+  of the Kerberos SPNEGO authentication mechanism which allows a "mixed" form of
+  authentication where Kerberos SPNEGO is used by non-browsers while an
+  alternate form of authentication (to be implemented by the user) is used for
+  browsers.  To use AltKerberos as the authentication mechanism (besides
+  providing an implementation), the authentication filter must be configured
+  with the following init parameters, in addition to the previously mentioned
+  Kerberos SPNEGO ones:
+
+    * <<<[PREFIX.]type>>>: the full class name of the implementation of
+      AltKerberosAuthenticationHandler to use.
+
+    * <<<[PREFIX.]alt-kerberos.non-browser.user-agents>>>: a comma-separated
+      list of which user-agents should be considered non-browsers.
+
+  <<Example>>:
+
++---+
+<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee">
+    ...
+
+    <filter>
+        <filter-name>kerberosFilter</filter-name>
+        <filter-class>org.apache.hadoop.security.auth.server.AuthenticationFilter</filter-class>
+        <init-param>
+            <param-name>type</param-name>
+            <param-value>org.my.subclass.of.AltKerberosAuthenticationHandler</param-value>
+        </init-param>
+        <init-param>
+            <param-name>alt-kerberos.non-browser.user-agents</param-name>
+            <param-value>java,curl,wget,perl</param-value>
+        </init-param>
+        <init-param>
+            <param-name>token.validity</param-name>
+            <param-value>30</param-value>
+        </init-param>
+        <init-param>
+            <param-name>cookie.domain</param-name>
+            <param-value>.foo.com</param-value>
+        </init-param>
+        <init-param>
+            <param-name>cookie.path</param-name>
+            <param-value>/</param-value>
+        </init-param>
+        <init-param>
+            <param-name>kerberos.principal</param-name>
+            <param-value>HTTP/localhost@LOCALHOST</param-value>
+        </init-param>
+        <init-param>
+            <param-name>kerberos.keytab</param-name>
+            <param-value>/tmp/auth.keytab</param-value>
+        </init-param>
+    </filter>
+
+    <filter-mapping>
+        <filter-name>kerberosFilter</filter-name>
+        <url-pattern>/kerberos/*</url-pattern>
+    </filter-mapping>
+
+    ...
+</web-app>
++---+
+
   \[ {{{./index.html}Go Back}} \]

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/site/apt/index.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/site/apt/index.apt.vm?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/site/apt/index.apt.vm (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/site/apt/index.apt.vm Mon Dec 10 03:37:32 2012
@@ -24,6 +24,11 @@ Hadoop Auth, Java HTTP SPNEGO ${project.
   Hadoop Auth also supports additional authentication mechanisms on the client
   and the server side via 2 simple interfaces.
 
+  Additionally, it provides a partially implemented derivative of the Kerberos
+  SPNEGO authentication to allow a "mixed" form of authentication where Kerberos
+  SPNEGO is used by non-browsers while an alternate form of authentication
+  (to be implemented by the user) is used for browsers.
+
 * License
 
   Hadoop Auth is distributed under {{{http://www.apache.org/licenses/}Apache

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/server/TestKerberosAuthenticationHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/server/TestKerberosAuthenticationHandler.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/server/TestKerberosAuthenticationHandler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/server/TestKerberosAuthenticationHandler.java Mon Dec 10 03:37:32 2012
@@ -28,23 +28,37 @@ import org.ietf.jgss.Oid;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import java.lang.reflect.Field;
 import java.util.Properties;
 import java.util.concurrent.Callable;
 
 public class TestKerberosAuthenticationHandler extends TestCase {
 
-  private KerberosAuthenticationHandler handler;
+  protected KerberosAuthenticationHandler handler;
+
+  protected KerberosAuthenticationHandler getNewAuthenticationHandler() {
+    return new KerberosAuthenticationHandler();
+  }
+
+  protected String getExpectedType() {
+    return KerberosAuthenticationHandler.TYPE;
+  }
+
+  protected Properties getDefaultProperties() {
+    Properties props = new Properties();
+    props.setProperty(KerberosAuthenticationHandler.PRINCIPAL,
+            KerberosTestUtils.getServerPrincipal());
+    props.setProperty(KerberosAuthenticationHandler.KEYTAB,
+            KerberosTestUtils.getKeytabFile());
+    props.setProperty(KerberosAuthenticationHandler.NAME_RULES,
+            "RULE:[1:$1@$0](.*@" + KerberosTestUtils.getRealm()+")s/@.*//\n");
+    return props;
+  }
 
   @Override
   protected void setUp() throws Exception {
     super.setUp();
-    handler = new KerberosAuthenticationHandler();
-    Properties props = new Properties();
-    props.setProperty(KerberosAuthenticationHandler.PRINCIPAL, KerberosTestUtils.getServerPrincipal());
-    props.setProperty(KerberosAuthenticationHandler.KEYTAB, KerberosTestUtils.getKeytabFile());
-    props.setProperty(KerberosAuthenticationHandler.NAME_RULES,
-                      "RULE:[1:$1@$0](.*@" + KerberosTestUtils.getRealm()+")s/@.*//\n");
+    handler = getNewAuthenticationHandler();
+    Properties props = getDefaultProperties();
     try {
       handler.init(props);
     } catch (Exception ex) {
@@ -71,10 +85,8 @@ public class TestKerberosAuthenticationH
 
     KerberosName.setRules("RULE:[1:$1@$0](.*@FOO)s/@.*//\nDEFAULT");
     
-    handler = new KerberosAuthenticationHandler();
-    Properties props = new Properties();
-    props.setProperty(KerberosAuthenticationHandler.PRINCIPAL, KerberosTestUtils.getServerPrincipal());
-    props.setProperty(KerberosAuthenticationHandler.KEYTAB, KerberosTestUtils.getKeytabFile());
+    handler = getNewAuthenticationHandler();
+    Properties props = getDefaultProperties();
     props.setProperty(KerberosAuthenticationHandler.NAME_RULES, "RULE:[1:$1@$0](.*@BAR)s/@.*//\nDEFAULT");
     try {
       handler.init(props);
@@ -97,8 +109,7 @@ public class TestKerberosAuthenticationH
   }
 
   public void testType() throws Exception {
-    KerberosAuthenticationHandler handler = new KerberosAuthenticationHandler();
-    assertEquals(KerberosAuthenticationHandler.TYPE, handler.getType());
+    assertEquals(getExpectedType(), handler.getType());
   }
 
   public void testRequestWithoutAuthorization() throws Exception {
@@ -182,7 +193,7 @@ public class TestKerberosAuthenticationH
 
       assertEquals(KerberosTestUtils.getClientPrincipal(), authToken.getName());
       assertTrue(KerberosTestUtils.getClientPrincipal().startsWith(authToken.getUserName()));
-      assertEquals(KerberosAuthenticationHandler.TYPE, authToken.getType());
+      assertEquals(getExpectedType(), authToken.getType());
     } else {
       Mockito.verify(response).setHeader(Mockito.eq(KerberosAuthenticator.WWW_AUTHENTICATE),
                                          Mockito.matches(KerberosAuthenticator.NEGOTIATE + " .*"));

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/CHANGES.txt Mon Dec 10 03:37:32 2012
@@ -143,6 +143,9 @@ Trunk (Unreleased)
 
   BUG FIXES
 
+    HADOOP-8418. Update UGI Principal classes name for running with
+    IBM JDK on 64 bits Windows.  (Yu Gao via eyang)
+
     HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName.
     (Devaraj K via umamahesh)
 
@@ -289,6 +292,9 @@ Trunk (Unreleased)
     HADOOP-9037. Bug in test-patch.sh and precommit build process (Kihwal Lee
     via jlowe)
 
+    HADOOP-9121. InodeTree.java has redundant check for vName while 
+    throwing exception. (Arup Malakar via suresh)
+
   OPTIMIZATIONS
 
     HADOOP-7761. Improve the performance of raw comparisons. (todd)
@@ -306,6 +312,12 @@ Release 2.0.3-alpha - Unreleased 
 
     HADOOP-9020. Add a SASL PLAIN server (daryn via bobby)
 
+    HADOOP-9090. Support on-demand publish of metrics. (Mostafa Elhemali via
+    suresh)
+
+    HADOOP-9054. Add AuthenticationHandler that uses Kerberos but allows for 
+    an alternate form of authentication for browsers. (rkanter via tucu)
+
   IMPROVEMENTS
 
     HADOOP-8789. Tests setLevel(Level.OFF) should be Level.ERROR.
@@ -456,6 +468,11 @@ Release 2.0.3-alpha - Unreleased 
     HADOOP-8958. ViewFs:Non absolute mount name failures when running 
     multiple tests on Windows. (Chris Nauroth via suresh)
 
+    HADOOP-9103. UTF8 class does not properly decode Unicode characters
+    outside the basic multilingual plane. (todd)
+
+    HADOOP-9070. Kerberos SASL server cannot find kerberos key. (daryn via atm)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:r1415804-1419190

Propchange: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/docs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/docs:r1415804-1419190

Propchange: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1415804-1419190

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java Mon Dec 10 03:37:32 2012
@@ -118,8 +118,7 @@ abstract class InodeTree<T> {
       return result;
     }
     
-    INode<T> resolveInternal(final String pathComponent)
-        throws FileNotFoundException {
+    INode<T> resolveInternal(final String pathComponent) {
       return children.get(pathComponent);
     }
     
@@ -336,8 +335,8 @@ abstract class InodeTree<T> {
     }
     if (!gotMountTableEntry) {
       throw new IOException(
-          "ViewFs: Cannot initialize: Empty Mount table in config for " + 
-             vName == null ? "viewfs:///" : ("viewfs://" + vName + "/"));
+          "ViewFs: Cannot initialize: Empty Mount table in config for " +
+             "viewfs://" + vName + "/");
     }
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java Mon Dec 10 03:37:32 2012
@@ -1858,10 +1858,10 @@ public class SequenceFile {
         UTF8 className = new UTF8();
 
         className.readFields(in);
-        keyClassName = className.toString(); // key class name
+        keyClassName = className.toStringChecked(); // key class name
 
         className.readFields(in);
-        valClassName = className.toString(); // val class name
+        valClassName = className.toStringChecked(); // val class name
       } else {
         keyClassName = Text.readString(in);
         valClassName = Text.readString(in);

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/UTF8.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/UTF8.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/UTF8.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/UTF8.java Mon Dec 10 03:37:32 2012
@@ -21,7 +21,9 @@ package org.apache.hadoop.io;
 import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.UTFDataFormatException;
 
+import org.apache.hadoop.util.StringUtils;
 
 import org.apache.commons.logging.*;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -31,6 +33,9 @@ import org.apache.hadoop.classification.
  * 
  * <p>Also includes utilities for efficiently reading and writing UTF-8.
  *
+ * Note that this decodes UTF-8 but actually encodes CESU-8, a variant of
+ * UTF-8: see http://en.wikipedia.org/wiki/CESU-8
+ *
  * @deprecated replaced by Text
  */
 @Deprecated
@@ -151,6 +156,21 @@ public class UTF8 implements WritableCom
     }
     return buffer.toString();
   }
+  
+  /**
+   * Convert to a string, checking for valid UTF8.
+   * @return the converted string
+   * @throws UTFDataFormatException if the underlying bytes contain invalid
+   * UTF8 data.
+   */
+  public String toStringChecked() throws IOException {
+    StringBuilder buffer = new StringBuilder(length);
+    synchronized (IBUF) {
+      IBUF.reset(bytes, length);
+      readChars(IBUF, buffer, length);
+    }
+    return buffer.toString();
+  }
 
   /** Returns true iff <code>o</code> is a UTF8 with the same contents.  */
   @Override
@@ -209,6 +229,19 @@ public class UTF8 implements WritableCom
     return result;
   }
 
+  /**
+   * Convert a UTF-8 encoded byte array back into a string.
+   *
+   * @throws IOException if the byte array is invalid UTF8
+   */
+  public static String fromBytes(byte[] bytes) throws IOException {
+    DataInputBuffer dbuf = new DataInputBuffer();
+    dbuf.reset(bytes, 0, bytes.length);
+    StringBuilder buf = new StringBuilder(bytes.length);
+    readChars(dbuf, buf, bytes.length);
+    return buf.toString();
+  }
+
   /** Read a UTF-8 encoded string.
    *
    * @see DataInput#readUTF()
@@ -221,7 +254,7 @@ public class UTF8 implements WritableCom
   }
 
   private static void readChars(DataInput in, StringBuilder buffer, int nBytes)
-    throws IOException {
+    throws UTFDataFormatException, IOException {
     DataOutputBuffer obuf = OBUF_FACTORY.get();
     obuf.reset();
     obuf.write(in, nBytes);
@@ -230,18 +263,60 @@ public class UTF8 implements WritableCom
     while (i < nBytes) {
       byte b = bytes[i++];
       if ((b & 0x80) == 0) {
+        // 0b0xxxxxxx: 1-byte sequence
         buffer.append((char)(b & 0x7F));
-      } else if ((b & 0xE0) != 0xE0) {
+      } else if ((b & 0xE0) == 0xC0) {
+        if (i >= nBytes) {
+          throw new UTFDataFormatException("Truncated UTF8 at " +
+              StringUtils.byteToHexString(bytes, i - 1, 1));
+        }
+        // 0b110xxxxx: 2-byte sequence
         buffer.append((char)(((b & 0x1F) << 6)
             | (bytes[i++] & 0x3F)));
-      } else {
+      } else if ((b & 0xF0) == 0xE0) {
+        // 0b1110xxxx: 3-byte sequence
+        if (i + 1 >= nBytes) {
+          throw new UTFDataFormatException("Truncated UTF8 at " +
+              StringUtils.byteToHexString(bytes, i - 1, 2));
+        }
         buffer.append((char)(((b & 0x0F) << 12)
             | ((bytes[i++] & 0x3F) << 6)
             |  (bytes[i++] & 0x3F)));
+      } else if ((b & 0xF8) == 0xF0) {
+        if (i + 2 >= nBytes) {
+          throw new UTFDataFormatException("Truncated UTF8 at " +
+              StringUtils.byteToHexString(bytes, i - 1, 3));
+        }
+        // 0b11110xxx: 4-byte sequence
+        int codepoint =
+            ((b & 0x07) << 18)
+          | ((bytes[i++] & 0x3F) <<  12)
+          | ((bytes[i++] & 0x3F) <<  6)
+          | ((bytes[i++] & 0x3F));
+        buffer.append(highSurrogate(codepoint))
+              .append(lowSurrogate(codepoint));
+      } else {
+        // The UTF8 standard describes 5-byte and 6-byte sequences, but
+        // these are no longer allowed as of 2003 (see RFC 3629)
+
+        // Only show the next 6 bytes max in the error code - in case the
+        // buffer is large, this will prevent an exceedingly large message.
+        int endForError = Math.min(i + 5, nBytes);
+        throw new UTFDataFormatException("Invalid UTF8 at " +
+            StringUtils.byteToHexString(bytes, i - 1, endForError));
       }
     }
   }
 
+  private static char highSurrogate(int codePoint) {
+    return (char) ((codePoint >>> 10)
+        + (Character.MIN_HIGH_SURROGATE - (Character.MIN_SUPPLEMENTARY_CODE_POINT >>> 10)));
+  }
+
+  private static char lowSurrogate(int codePoint) {
+    return (char) ((codePoint & 0x3ff) + Character.MIN_LOW_SURROGATE);
+  }
+
   /** Write a UTF-8 encoded string.
    *
    * @see DataOutput#writeUTF(String)

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Mon Dec 10 03:37:32 2012
@@ -199,7 +199,8 @@ public abstract class Server {
   //     in ObjectWritable to efficiently transmit arrays of primitives
   // 6 : Made RPC payload header explicit
   // 7 : Changed Ipc Connection Header to use Protocol buffers
-  public static final byte CURRENT_VERSION = 7;
+  // 8 : SASL server always sends a final response
+  public static final byte CURRENT_VERSION = 8;
 
   /**
    * Initial and max size of response buffer
@@ -1220,8 +1221,8 @@ public abstract class Server {
           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
           throw e;
         }
-        if (replyToken == null && authMethod == AuthMethod.PLAIN) {
-          // client needs at least response to know if it should use SIMPLE
+        if (saslServer.isComplete() && replyToken == null) {
+          // send final response for success
           replyToken = new byte[0];
         }
         if (replyToken != null) {
@@ -1392,7 +1393,7 @@ public abstract class Server {
     }
 
     private AuthMethod initializeAuthContext(AuthMethod authMethod)
-        throws IOException {
+        throws IOException, InterruptedException {
       try {
         if (enabledAuthMethods.contains(authMethod)) {
           saslServer = createSaslServer(authMethod);
@@ -1425,8 +1426,7 @@ public abstract class Server {
     }
 
     private SaslServer createSaslServer(AuthMethod authMethod)
-        throws IOException {
-      SaslServer saslServer = null;
+        throws IOException, InterruptedException {
       String hostname = null;
       String saslProtocol = null;
       CallbackHandler saslCallback = null;
@@ -1462,10 +1462,23 @@ public abstract class Server {
               "Server does not support SASL " + authMethod);
       }
       
-      String mechanism = authMethod.getMechanismName();
-      saslServer = Sasl.createSaslServer(
-          mechanism, saslProtocol, hostname,
-          SaslRpcServer.SASL_PROPS, saslCallback);
+      return createSaslServer(authMethod.getMechanismName(), saslProtocol,
+                              hostname, saslCallback);                                    
+    }
+
+    private SaslServer createSaslServer(final String mechanism,
+                                        final String protocol,
+                                        final String hostname,
+                                        final CallbackHandler callback
+        ) throws IOException, InterruptedException {
+      SaslServer saslServer = UserGroupInformation.getCurrentUser().doAs(
+          new PrivilegedExceptionAction<SaslServer>() {
+            @Override
+            public SaslServer run() throws SaslException  {
+              return Sasl.createSaslServer(mechanism, protocol, hostname,
+                                           SaslRpcServer.SASL_PROPS, callback);
+            }
+          });
       if (saslServer == null) {
         throw new AccessControlException(
             "Unable to find SASL server implementation for " + mechanism);

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsSystem.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsSystem.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricsSystem.java Mon Dec 10 03:37:32 2012
@@ -91,6 +91,17 @@ public abstract class MetricsSystem impl
   public abstract void register(Callback callback);
 
   /**
+   * Requests an immediate publish of all metrics from sources to sinks.
+   * 
+   * This is a "soft" request: the expectation is that a best effort will be
+   * done to synchronously snapshot the metrics from all the sources and put
+   * them in all the sinks (including flushing the sinks) before returning to
+   * the caller. If this can't be accomplished in reasonable time it's OK to
+   * return to the caller before everything is done. 
+   */
+  public abstract void publishMetricsNow();
+
+  /**
    * Shutdown the metrics system completely (usually during server shutdown.)
    * The MetricsSystemMXBean will be unregistered.
    * @return true if shutdown completed

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java Mon Dec 10 03:37:32 2012
@@ -19,6 +19,7 @@
 package org.apache.hadoop.metrics2.impl;
 
 import java.util.Random;
+import java.util.concurrent.*;
 
 import static com.google.common.base.Preconditions.*;
 
@@ -48,6 +49,7 @@ class MetricsSinkAdapter implements Sink
   private volatile boolean stopping = false;
   private volatile boolean inError = false;
   private final int period, firstRetryDelay, retryCount;
+  private final long oobPutTimeout;
   private final float retryBackoff;
   private final MetricsRegistry registry = new MetricsRegistry("sinkadapter");
   private final MutableStat latency;
@@ -69,6 +71,8 @@ class MetricsSinkAdapter implements Sink
     this.period = checkArg(period, period > 0, "period");
     firstRetryDelay = checkArg(retryDelay, retryDelay > 0, "retry delay");
     this.retryBackoff = checkArg(retryBackoff, retryBackoff>1, "retry backoff");
+    oobPutTimeout = (long)
+        (firstRetryDelay * Math.pow(retryBackoff, retryCount) * 1000);
     this.retryCount = retryCount;
     this.queue = new SinkQueue<MetricsBuffer>(checkArg(queueCapacity,
         queueCapacity > 0, "queue capacity"));
@@ -95,6 +99,23 @@ class MetricsSinkAdapter implements Sink
     }
     return true; // OK
   }
+  
+  public boolean putMetricsImmediate(MetricsBuffer buffer) {
+    WaitableMetricsBuffer waitableBuffer =
+        new WaitableMetricsBuffer(buffer);
+    if (!queue.enqueue(waitableBuffer)) {
+      LOG.warn(name + " has a full queue and can't consume the given metrics.");
+      dropped.incr();
+      return false;
+    }
+    if (!waitableBuffer.waitTillNotified(oobPutTimeout)) {
+      LOG.warn(name +
+          " couldn't fulfill an immediate putMetrics request in time." +
+          " Abandoning.");
+      return false;
+    }
+    return true;
+  }
 
   void publishMetricsFromQueue() {
     int retryDelay = firstRetryDelay;
@@ -158,6 +179,9 @@ class MetricsSinkAdapter implements Sink
       sink.flush();
       latency.add(Time.now() - ts);
     }
+    if (buffer instanceof WaitableMetricsBuffer) {
+      ((WaitableMetricsBuffer)buffer).notifyAnyWaiters();
+    }
     LOG.debug("Done");
   }
 
@@ -191,4 +215,26 @@ class MetricsSinkAdapter implements Sink
   MetricsSink sink() {
     return sink;
   }
+
+  static class WaitableMetricsBuffer extends MetricsBuffer {
+    private final Semaphore notificationSemaphore =
+        new Semaphore(0);
+
+    public WaitableMetricsBuffer(MetricsBuffer metricsBuffer) {
+      super(metricsBuffer);
+    }
+
+    public boolean waitTillNotified(long millisecondsToWait) {
+      try {
+        return notificationSemaphore.tryAcquire(millisecondsToWait,
+            TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        return false;
+      }
+    }
+
+    public void notifyAnyWaiters() {
+      notificationSemaphore.release();
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java Mon Dec 10 03:37:32 2012
@@ -344,9 +344,19 @@ public class MetricsSystemImpl extends M
   synchronized void onTimerEvent() {
     logicalTime += period;
     if (sinks.size() > 0) {
-      publishMetrics(sampleMetrics());
+      publishMetrics(sampleMetrics(), false);
     }
   }
+  
+  /**
+   * Requests an immediate publish of all metrics from sources to sinks.
+   */
+  @Override
+  public void publishMetricsNow() {
+    if (sinks.size() > 0) {
+      publishMetrics(sampleMetrics(), true);
+    }    
+  }
 
   /**
    * Sample all the sources for a snapshot of metrics/tags
@@ -380,12 +390,20 @@ public class MetricsSystemImpl extends M
   /**
    * Publish a metrics snapshot to all the sinks
    * @param buffer  the metrics snapshot to publish
+   * @param immediate  indicates that we should publish metrics immediately
+   *                   instead of using a separate thread.
    */
-  synchronized void publishMetrics(MetricsBuffer buffer) {
+  synchronized void publishMetrics(MetricsBuffer buffer, boolean immediate) {
     int dropped = 0;
     for (MetricsSinkAdapter sa : sinks.values()) {
       long startTime = Time.now();
-      dropped += sa.putMetrics(buffer, logicalTime) ? 0 : 1;
+      boolean result;
+      if (immediate) {
+        result = sa.putMetricsImmediate(buffer); 
+      } else {
+        result = sa.putMetrics(buffer, logicalTime);
+      }
+      dropped += result ? 0 : 1;
       publishStat.add(Time.now() - startTime);
     }
     droppedPubAll.incr(dropped);

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java Mon Dec 10 03:37:32 2012
@@ -299,13 +299,17 @@ public class UserGroupInformation {
   
   private static String OS_LOGIN_MODULE_NAME;
   private static Class<? extends Principal> OS_PRINCIPAL_CLASS;
-  private static final boolean windows = 
-                           System.getProperty("os.name").startsWith("Windows");
+  private static final boolean windows =
+      System.getProperty("os.name").startsWith("Windows");
+  private static final boolean is64Bit =
+      System.getProperty("os.arch").contains("64");
   /* Return the OS login module class name */
   private static String getOSLoginModuleName() {
     if (System.getProperty("java.vendor").contains("IBM")) {
-      return windows ? "com.ibm.security.auth.module.NTLoginModule"
-       : "com.ibm.security.auth.module.LinuxLoginModule";
+      return windows ? (is64Bit
+          ? "com.ibm.security.auth.module.Win64LoginModule"
+          : "com.ibm.security.auth.module.NTLoginModule")
+        : "com.ibm.security.auth.module.LinuxLoginModule";
     } else {
       return windows ? "com.sun.security.auth.module.NTLoginModule"
         : "com.sun.security.auth.module.UnixLoginModule";
@@ -319,13 +323,13 @@ public class UserGroupInformation {
     try {
       if (System.getProperty("java.vendor").contains("IBM")) {
         if (windows) {
-          return (Class<? extends Principal>)
-            cl.loadClass("com.ibm.security.auth.UsernamePrincipal");
+          return (Class<? extends Principal>) (is64Bit
+            ? cl.loadClass("com.ibm.security.auth.UsernamePrincipal")
+            : cl.loadClass("com.ibm.security.auth.NTUserPrincipal"));
         } else {
-          return (Class<? extends Principal>)
-            (System.getProperty("os.arch").contains("64")
-             ? cl.loadClass("com.ibm.security.auth.UsernamePrincipal")
-             : cl.loadClass("com.ibm.security.auth.LinuxPrincipal"));
+          return (Class<? extends Principal>) (is64Bit
+            ? cl.loadClass("com.ibm.security.auth.UsernamePrincipal")
+            : cl.loadClass("com.ibm.security.auth.LinuxPrincipal"));
         }
       } else {
         return (Class<? extends Principal>) (windows

Propchange: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/core/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/core:r1415804-1419190

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextTestHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextTestHelper.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextTestHelper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextTestHelper.java Mon Dec 10 03:37:32 2012
@@ -32,7 +32,7 @@ import org.junit.Assert;
  */
 public final class FileContextTestHelper {
   // The test root is relative to the <wd>/build/test/data by default
-  public static final String TEST_ROOT_DIR = 
+  public static String TEST_ROOT_DIR = 
     System.getProperty("test.build.data", "build/test/data") + "/test";
   private static final int DEFAULT_BLOCK_SIZE = 1024;
   private static final int DEFAULT_NUM_BLOCKS = 2;

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestHelper.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestHelper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestHelper.java Mon Dec 10 03:37:32 2012
@@ -34,7 +34,7 @@ import static org.mockito.Mockito.mock;
  */
 public final class FileSystemTestHelper {
   // The test root is relative to the <wd>/build/test/data by default
-  public static final String TEST_ROOT_DIR = 
+  public static String TEST_ROOT_DIR = 
     System.getProperty("test.build.data", "target/test/data") + "/test";
   private static final int DEFAULT_BLOCK_SIZE = 1024;
   private static final int DEFAULT_NUM_BLOCKS = 2;

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestUTF8.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestUTF8.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestUTF8.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestUTF8.java Mon Dec 10 03:37:32 2012
@@ -19,8 +19,13 @@
 package org.apache.hadoop.io;
 
 import junit.framework.TestCase;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
 import java.util.Random;
 
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.StringUtils;
+
 /** Unit tests for UTF8. */
 @SuppressWarnings("deprecation")
 public class TestUTF8 extends TestCase {
@@ -92,5 +97,73 @@ public class TestUTF8 extends TestCase {
 
     assertEquals(s, new String(dob.getData(), 2, dob.getLength()-2, "UTF-8"));
   }
-	
+
+  /**
+   * Test encoding and decoding of UTF8 outside the basic multilingual plane.
+   *
+   * This is a regression test for HADOOP-9103.
+   */
+  public void testNonBasicMultilingualPlane() throws Exception {
+    // Test using the "CAT FACE" character (U+1F431)
+    // See http://www.fileformat.info/info/unicode/char/1f431/index.htm
+    String catFace = "\uD83D\uDC31";
+
+    // This encodes to 4 bytes in UTF-8:
+    byte[] encoded = catFace.getBytes("UTF-8");
+    assertEquals(4, encoded.length);
+    assertEquals("f09f90b1", StringUtils.byteToHexString(encoded));
+
+    // Decode back to String using our own decoder
+    String roundTrip = UTF8.fromBytes(encoded);
+    assertEquals(catFace, roundTrip);
+  }
+
+  /**
+   * Test that decoding invalid UTF8 throws an appropriate error message.
+   */
+  public void testInvalidUTF8() throws Exception {
+    byte[] invalid = new byte[] {
+        0x01, 0x02, (byte)0xff, (byte)0xff, 0x01, 0x02, 0x03, 0x04, 0x05 };
+    try {
+      UTF8.fromBytes(invalid);
+      fail("did not throw an exception");
+    } catch (UTFDataFormatException utfde) {
+      GenericTestUtils.assertExceptionContains(
+          "Invalid UTF8 at ffff01020304", utfde);
+    }
+  }
+
+  /**
+   * Test for a 5-byte UTF8 sequence, which is now considered illegal.
+   */
+  public void test5ByteUtf8Sequence() throws Exception {
+    byte[] invalid = new byte[] {
+        0x01, 0x02, (byte)0xf8, (byte)0x88, (byte)0x80,
+        (byte)0x80, (byte)0x80, 0x04, 0x05 };
+    try {
+      UTF8.fromBytes(invalid);
+      fail("did not throw an exception");
+    } catch (UTFDataFormatException utfde) {
+      GenericTestUtils.assertExceptionContains(
+          "Invalid UTF8 at f88880808004", utfde);
+    }
+  }
+  
+  /**
+   * Test that decoding invalid UTF8 due to truncation yields the correct
+   * exception type.
+   */
+  public void testInvalidUTF8Truncated() throws Exception {
+    // Truncated CAT FACE character -- this is a 4-byte sequence, but we
+    // only have the first three bytes.
+    byte[] truncated = new byte[] {
+        (byte)0xF0, (byte)0x9F, (byte)0x90 };
+    try {
+      UTF8.fromBytes(truncated);
+      fail("did not throw an exception");
+    } catch (UTFDataFormatException utfde) {
+      GenericTestUtils.assertExceptionContains(
+          "Truncated UTF8 at f09f90", utfde);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java Mon Dec 10 03:37:32 2012
@@ -29,8 +29,6 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -115,31 +113,23 @@ public class TestGangliaMetrics {
     final int expectedCountFromGanglia30 = expectedMetrics.length;
     final int expectedCountFromGanglia31 = 2 * expectedMetrics.length;
 
-    // use latch to make sure we received required records before shutting
-    // down the MetricSystem
-    CountDownLatch latch = new CountDownLatch(
-        expectedCountFromGanglia30 + expectedCountFromGanglia31);
-
     // Setup test for GangliaSink30
     AbstractGangliaSink gsink30 = new GangliaSink30();
     gsink30.init(cb.subset("test"));
-    MockDatagramSocket mockds30 = new MockDatagramSocket(latch);
+    MockDatagramSocket mockds30 = new MockDatagramSocket();
     GangliaMetricsTestHelper.setDatagramSocket(gsink30, mockds30);
 
     // Setup test for GangliaSink31
     AbstractGangliaSink gsink31 = new GangliaSink31();
     gsink31.init(cb.subset("test"));
-    MockDatagramSocket mockds31 = new MockDatagramSocket(latch);
+    MockDatagramSocket mockds31 = new MockDatagramSocket();
     GangliaMetricsTestHelper.setDatagramSocket(gsink31, mockds31);
 
     // register the sinks
     ms.register("gsink30", "gsink30 desc", gsink30);
     ms.register("gsink31", "gsink31 desc", gsink31);
-    ms.onTimerEvent();  // trigger something interesting
+    ms.publishMetricsNow(); // publish the metrics
 
-    // wait for all records and the stop MetricSystem.  Without this
-    // sometime the ms gets shutdown before all the sinks have consumed
-    latch.await(200, TimeUnit.MILLISECONDS);
     ms.stop();
 
     // check GanfliaSink30 data
@@ -198,7 +188,6 @@ public class TestGangliaMetrics {
    */
   private class MockDatagramSocket extends DatagramSocket {
     private ArrayList<byte[]> capture;
-    private CountDownLatch latch;
 
     /**
      * @throws SocketException
@@ -207,15 +196,6 @@ public class TestGangliaMetrics {
       capture = new  ArrayList<byte[]>();
     }
 
-    /**
-     * @param latch
-     * @throws SocketException
-     */
-    public MockDatagramSocket(CountDownLatch latch) throws SocketException {
-      this();
-      this.latch = latch;
-    }
-
     /* (non-Javadoc)
      * @see java.net.DatagramSocket#send(java.net.DatagramPacket)
      */
@@ -225,9 +205,6 @@ public class TestGangliaMetrics {
       byte[] bytes = new byte[p.getLength()];
       System.arraycopy(p.getData(), p.getOffset(), bytes, 0, p.getLength());
       capture.add(bytes);
-
-      // decrement the latch
-      latch.countDown();
     }
 
     /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java?rev=1419193&r1=1419192&r2=1419193&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java Mon Dec 10 03:37:32 2012
@@ -18,7 +18,11 @@
 
 package org.apache.hadoop.metrics2.impl;
 
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import javax.annotation.Nullable;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -26,9 +30,11 @@ import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.runners.MockitoJUnitRunner;
+
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
+import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 
 import org.apache.commons.configuration.SubsetConfiguration;
@@ -36,6 +42,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.metrics2.MetricsException;
 import static org.apache.hadoop.test.MoreAsserts.*;
+
+import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsSink;
 import org.apache.hadoop.metrics2.MetricsSource;
@@ -47,6 +55,7 @@ import org.apache.hadoop.metrics2.lib.Me
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * Test the MetricsSystemImpl class
@@ -72,7 +81,7 @@ public class TestMetricsSystemImpl {
   }
 
   @Test public void testInitFirst() throws Exception {
-    ConfigBuilder cb = new ConfigBuilder().add("*.period", 8)
+    new ConfigBuilder().add("*.period", 8)
         //.add("test.sink.plugin.urls", getPluginUrlsAsString())
         .add("test.sink.test.class", TestSink.class.getName())
         .add("test.*.source.filter.exclude", "s0")
@@ -93,8 +102,9 @@ public class TestMetricsSystemImpl {
     MetricsSink sink2 = mock(MetricsSink.class);
     ms.registerSink("sink1", "sink1 desc", sink1);
     ms.registerSink("sink2", "sink2 desc", sink2);
-    ms.onTimerEvent();  // trigger something interesting
+    ms.publishMetricsNow(); // publish the metrics
     ms.stop();
+    ms.shutdown();
 
     verify(sink1, times(2)).putMetrics(r1.capture());
     List<MetricsRecord> mr1 = r1.getAllValues();
@@ -104,6 +114,177 @@ public class TestMetricsSystemImpl {
     assertEquals("output", mr1, mr2);
   }
 
+  @Test public void testMultiThreadedPublish() throws Exception {
+    new ConfigBuilder().add("*.period", 80)
+      .add("test.sink.Collector.queue.capacity", "20")
+      .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+    final MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+    ms.start();
+    final int numThreads = 10;
+    final CollectingSink sink = new CollectingSink(numThreads);
+    ms.registerSink("Collector",
+        "Collector of values from all threads.", sink);
+    final TestSource[] sources = new TestSource[numThreads];
+    final Thread[] threads = new Thread[numThreads];
+    final String[] results = new String[numThreads];
+    final CyclicBarrier barrier1 = new CyclicBarrier(numThreads),
+        barrier2 = new CyclicBarrier(numThreads);
+    for (int i = 0; i < numThreads; i++) {
+      sources[i] = ms.register("threadSource" + i,
+          "A source of my threaded goodness.",
+          new TestSource("threadSourceRec" + i));
+      threads[i] = new Thread(new Runnable() {
+        private boolean safeAwait(int mySource, CyclicBarrier barrier) {
+          try {
+            barrier1.await(2, TimeUnit.SECONDS);
+          } catch (InterruptedException e) {
+            results[mySource] = "Interrupted";
+            return false;
+          } catch (BrokenBarrierException e) {
+            results[mySource] = "Broken Barrier";
+            return false;
+          } catch (TimeoutException e) {
+            results[mySource] = "Timed out on barrier";
+            return false;
+          }
+          return true;
+        }
+        
+        @Override
+        public void run() {
+          int mySource = Integer.parseInt(Thread.currentThread().getName());
+          if (sink.collected[mySource].get() != 0L) {
+            results[mySource] = "Someone else collected my metric!";
+            return;
+          }
+          // Wait for all the threads to come here so we can hammer
+          // the system at the same time
+          if (!safeAwait(mySource, barrier1)) return;
+          sources[mySource].g1.set(230);
+          ms.publishMetricsNow();
+          // Since some other thread may have snatched my metric,
+          // I need to wait for the threads to finish before checking.
+          if (!safeAwait(mySource, barrier2)) return;
+          if (sink.collected[mySource].get() != 230L) {
+            results[mySource] = "Metric not collected!";
+            return;
+          }
+          results[mySource] = "Passed";
+        }
+      }, "" + i);
+    }
+    for (Thread t : threads)
+      t.start();
+    for (Thread t : threads)
+      t.join();
+    assertEquals(0L, ms.droppedPubAll.value());
+    assertTrue(StringUtils.join("\n", Arrays.asList(results)),
+      Iterables.all(Arrays.asList(results), new Predicate<String>() {
+        @Override
+        public boolean apply(@Nullable String input) {
+          return input.equalsIgnoreCase("Passed");
+        }
+      }));
+    ms.stop();
+    ms.shutdown();
+  }
+
+  private static class CollectingSink implements MetricsSink {
+    private final AtomicLong[] collected;
+    
+    public CollectingSink(int capacity) {
+      collected = new AtomicLong[capacity];
+      for (int i = 0; i < capacity; i++) {
+        collected[i] = new AtomicLong();
+      }
+    }
+    
+    @Override
+    public void init(SubsetConfiguration conf) {
+    }
+
+    @Override
+    public void putMetrics(MetricsRecord record) {
+      final String prefix = "threadSourceRec";
+      if (record.name().startsWith(prefix)) {
+        final int recordNumber = Integer.parseInt(
+            record.name().substring(prefix.length()));
+        ArrayList<String> names = new ArrayList<String>();
+        for (AbstractMetric m : record.metrics()) {
+          if (m.name().equalsIgnoreCase("g1")) {
+            collected[recordNumber].set(m.value().longValue());
+            return;
+          }
+          names.add(m.name());
+        }
+      }
+    }
+
+    @Override
+    public void flush() {
+    }
+  }
+
+  @Test public void testHangingSink() {
+    new ConfigBuilder().add("*.period", 8)
+      .add("test.sink.test.class", TestSink.class.getName())
+      .add("test.sink.hanging.retry.delay", "1")
+      .add("test.sink.hanging.retry.backoff", "1.01")
+      .add("test.sink.hanging.retry.count", "0")
+      .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+    MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+    ms.start();
+    TestSource s = ms.register("s3", "s3 desc", new TestSource("s3rec"));
+    s.c1.incr();
+    HangingSink hanging = new HangingSink();
+    ms.registerSink("hanging", "Hang the sink!", hanging);
+    ms.publishMetricsNow();
+    assertEquals(1L, ms.droppedPubAll.value());
+    assertFalse(hanging.getInterrupted());
+    ms.stop();
+    ms.shutdown();
+    assertTrue(hanging.getInterrupted());
+    assertTrue("The sink didn't get called after its first hang " +
+               "for subsequent records.", hanging.getGotCalledSecondTime());
+  }
+
+  private static class HangingSink implements MetricsSink {
+    private volatile boolean interrupted;
+    private boolean gotCalledSecondTime;
+    private boolean firstTime = true;
+
+    public boolean getGotCalledSecondTime() {
+      return gotCalledSecondTime;
+    }
+
+    public boolean getInterrupted() {
+      return interrupted;
+    }
+
+    @Override
+    public void init(SubsetConfiguration conf) {
+    }
+
+    @Override
+    public void putMetrics(MetricsRecord record) {
+      // No need to hang every time, just the first record.
+      if (!firstTime) {
+        gotCalledSecondTime = true;
+        return;
+      }
+      firstTime = false;
+      try {
+        Thread.sleep(10 * 1000);
+      } catch (InterruptedException ex) {
+        interrupted = true;
+      }
+    }
+
+    @Override
+    public void flush() {
+    }
+  }
+
   @Test public void testRegisterDups() {
     MetricsSystem ms = new MetricsSystemImpl();
     TestSource ts1 = new TestSource("ts1");
@@ -116,6 +297,7 @@ public class TestMetricsSystemImpl {
     MetricsSource s2 = ms.getSource("ts1");
     assertNotNull(s2);
     assertNotSame(s1, s2);
+    ms.shutdown();
   }
 
   @Test(expected=MetricsException.class) public void testRegisterDupError() {



Mime
View raw message