Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 97E7010738 for ; Fri, 14 Mar 2014 21:19:56 +0000 (UTC) Received: (qmail 6788 invoked by uid 500); 14 Mar 2014 21:19:54 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 6430 invoked by uid 500); 14 Mar 2014 21:19:53 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 5955 invoked by uid 99); 14 Mar 2014 21:19:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Mar 2014 21:19:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Mar 2014 21:19:50 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4255B23889FA; Fri, 14 Mar 2014 21:19:30 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1577720 - in /hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common: ./ src/ src/main/java/ src/main/java/org/apache/hadoop/ipc/ src/test/java/org/apache/hadoop/ipc/ Date: Fri, 14 Mar 2014 21:19:29 -0000 To: common-commits@hadoop.apache.org From: szetszwo@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140314211930.4255B23889FA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: szetszwo Date: Fri Mar 14 21:19:29 2014 New Revision: 1577720 URL: http://svn.apache.org/r1577720 Log: svn merge -c 1577710 from trunk for HADOOP-10407. Fix the javac warnings in org.apache.hadoop.ipc package. Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/ (props changed) hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (contents, props changed) hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/ (props changed) hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/ (props changed) hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common:r1577710 Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1577720&r1=1577719&r2=1577720&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt Fri Mar 14 21:19:29 2014 @@ -120,6 +120,9 @@ Release 2.4.0 - UNRELEASED HADOOP-10337 ConcurrentModificationException from MetricsDynamicMBeanBase.createMBeanInfo() (Liang Xie via stack) + HADOOP-10407. Fix the javac warnings in org.apache.hadoop.ipc package. + (szetszwo) + BREAKDOWN OF HADOOP-10184 SUBTASKS AND RELATED JIRAS HADOOP-10185. FileSystem API for ACLs. (cnauroth) Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:r1577710 Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src:r1577710 Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1577710 Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java?rev=1577720&r1=1577719&r2=1577720&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java Fri Mar 14 21:19:29 2014 @@ -18,15 +18,13 @@ package org.apache.hadoop.ipc; +import java.lang.reflect.Constructor; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.TimeUnit; - -import java.lang.reflect.Constructor; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configuration; /** @@ -35,13 +33,19 @@ import org.apache.hadoop.conf.Configurat public class CallQueueManager { public static final Log LOG = LogFactory.getLog(CallQueueManager.class); + @SuppressWarnings("unchecked") + static Class> convertQueueClass( + Class queneClass, Class elementClass) { + return (Class>)queneClass; + } + // Atomic refs point to active callQueue // We have two so we can better control swapping private final AtomicReference> putRef; private final AtomicReference> takeRef; - public CallQueueManager(Class backingClass, int maxQueueSize, - String namespace, Configuration conf) { + public CallQueueManager(Class> backingClass, + int maxQueueSize, String namespace, Configuration conf) { BlockingQueue bq = createCallQueueInstance(backingClass, maxQueueSize, namespace, conf); this.putRef = new AtomicReference>(bq); @@ -49,15 +53,14 @@ public class CallQueueManager { LOG.info("Using callQueue " + backingClass); } - @SuppressWarnings("unchecked") - private BlockingQueue createCallQueueInstance(Class theClass, int maxLen, - String ns, Configuration conf) { + private > T createCallQueueInstance( + Class theClass, int maxLen, String ns, Configuration conf) { // Used for custom, configurable callqueues try { - Constructor ctor = theClass.getDeclaredConstructor(int.class, String.class, + Constructor ctor = theClass.getDeclaredConstructor(int.class, String.class, Configuration.class); - return (BlockingQueue)ctor.newInstance(maxLen, ns, conf); + return ctor.newInstance(maxLen, ns, conf); } catch (RuntimeException e) { throw e; } catch (Exception e) { @@ -65,8 +68,8 @@ public class CallQueueManager { // Used for LinkedBlockingQueue, ArrayBlockingQueue, etc try { - Constructor ctor = theClass.getDeclaredConstructor(int.class); - return (BlockingQueue)ctor.newInstance(maxLen); + Constructor ctor = theClass.getDeclaredConstructor(int.class); + return ctor.newInstance(maxLen); } catch (RuntimeException e) { throw e; } catch (Exception e) { @@ -74,8 +77,8 @@ public class CallQueueManager { // Last attempt try { - Constructor ctor = theClass.getDeclaredConstructor(); - return (BlockingQueue)ctor.newInstance(); + Constructor ctor = theClass.getDeclaredConstructor(); + return ctor.newInstance(); } catch (RuntimeException e) { throw e; } catch (Exception e) { @@ -117,8 +120,9 @@ public class CallQueueManager { * Replaces active queue with the newly requested one and transfers * all calls to the newQ before returning. */ - public synchronized void swapQueue(Class queueClassToUse, int maxSize, - String ns, Configuration conf) { + public synchronized void swapQueue( + Class> queueClassToUse, int maxSize, + String ns, Configuration conf) { BlockingQueue newQ = createCallQueueInstance(queueClassToUse, maxSize, ns, conf); @@ -143,7 +147,7 @@ public class CallQueueManager { * This doesn't mean the queue might not fill up at some point later, but * it should decrease the probability that we lose a call this way. */ - private boolean queueIsReallyEmpty(BlockingQueue q) { + private boolean queueIsReallyEmpty(BlockingQueue q) { boolean wasEmpty = q.isEmpty(); try { Thread.sleep(10); Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1577720&r1=1577719&r2=1577720&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Fri Mar 14 21:19:29 2014 @@ -541,7 +541,7 @@ public class Client { } private synchronized AuthMethod setupSaslConnection(final InputStream in2, - final OutputStream out2) throws IOException, InterruptedException { + final OutputStream out2) throws IOException { // Do not use Client.conf here! We must use ConnectionId.conf, since the // Client object is cached and shared between all RPC clients, even those // for separate services. Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1577720&r1=1577719&r2=1577720&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Fri Mar 14 21:19:29 2014 @@ -18,6 +18,11 @@ package org.apache.hadoop.ipc; +import static org.apache.hadoop.ipc.RpcConstants.AUTHORIZATION_FAILED_CALL_ID; +import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID; +import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION; +import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -75,8 +80,6 @@ import org.apache.hadoop.fs.CommonConfig import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; -import static org.apache.hadoop.ipc.RpcConstants.*; - import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper; import org.apache.hadoop.ipc.RPC.RpcInvoker; @@ -467,17 +470,24 @@ public abstract class Server { return serviceAuthorizationManager; } + static Class> getQueueClass( + String prefix, Configuration conf) { + String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY; + Class queueClass = conf.getClass(name, LinkedBlockingQueue.class); + return CallQueueManager.convertQueueClass(queueClass, Call.class); + } + + private String getQueueClassPrefix() { + return CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + port; + } + /* * Refresh the call queue */ public synchronized void refreshCallQueue(Configuration conf) { // Create the next queue - String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + - this.port; - Class queueClassToUse = conf.getClass(prefix + "." + - CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class); - - callQueue.swapQueue(queueClassToUse, maxQueueSize, prefix, conf); + String prefix = getQueueClassPrefix(); + callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf); } /** A call queued for handling. */ @@ -1225,9 +1235,9 @@ public abstract class Server { Throwable cause = e; while (cause != null) { if (cause instanceof RetriableException) { - return (RetriableException) cause; + return cause; } else if (cause instanceof StandbyException) { - return (StandbyException) cause; + return cause; } else if (cause instanceof InvalidToken) { // FIXME: hadoop method signatures are restricting the SASL // callbacks to only returning InvalidToken, but some services @@ -1297,7 +1307,7 @@ public abstract class Server { private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage) throws IOException, InterruptedException { - RpcSaslProto saslResponse = null; + final RpcSaslProto saslResponse; final SaslState state = saslMessage.getState(); // required switch (state) { case NEGOTIATE: { @@ -1333,27 +1343,18 @@ public abstract class Server { // SIMPLE is a legit option above. we will send no response if (authMethod == AuthMethod.SIMPLE) { switchToSimple(); + saslResponse = null; break; } // sasl server for tokens may already be instantiated if (saslServer == null || authMethod != AuthMethod.TOKEN) { saslServer = createSaslServer(authMethod); } - // fallthru to process sasl token + saslResponse = processSaslToken(saslMessage); + break; } case RESPONSE: { - if (!saslMessage.hasToken()) { - throw new SaslException("Client did not send a token"); - } - byte[] saslToken = saslMessage.getToken().toByteArray(); - if (LOG.isDebugEnabled()) { - LOG.debug("Have read input token of size " + saslToken.length - + " for processing by saslServer.evaluateResponse()"); - } - saslToken = saslServer.evaluateResponse(saslToken); - saslResponse = buildSaslResponse( - saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE, - saslToken); + saslResponse = processSaslToken(saslMessage); break; } default: @@ -1362,6 +1363,22 @@ public abstract class Server { return saslResponse; } + private RpcSaslProto processSaslToken(RpcSaslProto saslMessage) + throws SaslException { + if (!saslMessage.hasToken()) { + throw new SaslException("Client did not send a token"); + } + byte[] saslToken = saslMessage.getToken().toByteArray(); + if (LOG.isDebugEnabled()) { + LOG.debug("Have read input token of size " + saslToken.length + + " for processing by saslServer.evaluateResponse()"); + } + saslToken = saslServer.evaluateResponse(saslToken); + return buildSaslResponse( + saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE, + saslToken); + } + private void switchToSimple() { // disable SASL and blank out any SASL server authProtocol = AuthProtocol.NONE; @@ -2123,12 +2140,9 @@ public abstract class Server { CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); // Setup appropriate callqueue - String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + - this.port; - Class queueClassToUse = conf.getClass(prefix + "." + - CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class); - this.callQueue = new CallQueueManager(queueClassToUse, maxQueueSize, - prefix, conf); + final String prefix = getQueueClassPrefix(); + this.callQueue = new CallQueueManager(getQueueClass(prefix, conf), + maxQueueSize, prefix, conf); this.secretManager = (SecretManager) secretManager; this.authorize = Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java?rev=1577720&r1=1577719&r2=1577720&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java Fri Mar 14 21:19:29 2014 @@ -18,22 +18,15 @@ package org.apache.hadoop.ipc; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.HashMap; -import java.util.ArrayList; - import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import org.junit.Assert; -import org.junit.Assume; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + import org.junit.Test; -import org.junit.Before; -import org.junit.After; public class TestCallQueueManager { private CallQueueManager manager; @@ -146,23 +139,26 @@ public class TestCallQueueManager { } + private static final Class> queueClass + = CallQueueManager.convertQueueClass(LinkedBlockingQueue.class, FakeCall.class); + @Test public void testCallQueueCapacity() throws InterruptedException { - manager = new CallQueueManager(LinkedBlockingQueue.class, 10, "", null); + manager = new CallQueueManager(queueClass, 10, "", null); assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity } @Test public void testEmptyConsume() throws InterruptedException { - manager = new CallQueueManager(LinkedBlockingQueue.class, 10, "", null); + manager = new CallQueueManager(queueClass, 10, "", null); assertCanTake(manager, 0, 1); // Fails since it's empty } @Test(timeout=60000) public void testSwapUnderContention() throws InterruptedException { - manager = new CallQueueManager(LinkedBlockingQueue.class, 5000, "", null); + manager = new CallQueueManager(queueClass, 5000, "", null); ArrayList producers = new ArrayList(); ArrayList consumers = new ArrayList(); @@ -191,7 +187,7 @@ public class TestCallQueueManager { Thread.sleep(10); for (int i=0; i < 5; i++) { - manager.swapQueue(LinkedBlockingQueue.class, 5000, "", null); + manager.swapQueue(queueClass, 5000, "", null); } // Stop the producers