Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 14A60183CD for ; Wed, 27 Jan 2016 16:47:32 +0000 (UTC) Received: (qmail 8123 invoked by uid 500); 27 Jan 2016 16:39:49 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 8013 invoked by uid 500); 27 Jan 2016 16:39:49 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 7696 invoked by uid 99); 27 Jan 2016 16:39:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Jan 2016 16:39:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AEDDEE0974; Wed, 27 Jan 2016 16:39:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Wed, 27 Jan 2016 16:39:53 -0000 Message-Id: <8c6e12dc97b1446fac1ff7aad64c1896@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/30] hbase-site git commit: Published site at 845d00a16bc22cced0a2eead3d0ba48989968fb6. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/6d411951/testdevapidocs/src-html/org/apache/hadoop/hbase/ipc/TestDelayedRpc.TestThread.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/src-html/org/apache/hadoop/hbase/ipc/TestDelayedRpc.TestThread.html b/testdevapidocs/src-html/org/apache/hadoop/hbase/ipc/TestDelayedRpc.TestThread.html deleted file mode 100644 index c6a4fc3..0000000 --- a/testdevapidocs/src-html/org/apache/hadoop/hbase/ipc/TestDelayedRpc.TestThread.html +++ /dev/null @@ -1,439 +0,0 @@ - - - -Source code - - - -
-
001/**
-002 *
-003 * Licensed to the Apache Software Foundation (ASF) under one
-004 * or more contributor license agreements.  See the NOTICE file
-005 * distributed with this work for additional information
-006 * regarding copyright ownership.  The ASF licenses this file
-007 * to you under the Apache License, Version 2.0 (the
-008 * "License"); you may not use this file except in compliance
-009 * with the License.  You may obtain a copy of the License at
-010 *
-011 *     http://www.apache.org/licenses/LICENSE-2.0
-012 *
-013 * Unless required by applicable law or agreed to in writing, software
-014 * distributed under the License is distributed on an "AS IS" BASIS,
-015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-016 * See the License for the specific language governing permissions and
-017 * limitations under the License.
-018 */
-019package org.apache.hadoop.hbase.ipc;
-020
-021import static org.junit.Assert.assertEquals;
-022import static org.junit.Assert.assertFalse;
-023import static org.junit.Assert.assertTrue;
-024import static org.junit.Assert.fail;
-025
-026import java.io.IOException;
-027import java.net.InetSocketAddress;
-028import java.util.ArrayList;
-029import java.util.List;
-030
-031import org.apache.commons.logging.Log;
-032import org.apache.commons.logging.LogFactory;
-033import org.apache.hadoop.conf.Configuration;
-034import org.apache.hadoop.hbase.HBaseConfiguration;
-035import org.apache.hadoop.hbase.HConstants;
-036import org.apache.hadoop.hbase.testclassification.MediumTests;
-037import org.apache.hadoop.hbase.testclassification.RPCTests;
-038import org.apache.hadoop.hbase.ServerName;
-039import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
-040import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg;
-041import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse;
-042import org.apache.hadoop.hbase.security.User;
-043import org.apache.log4j.AppenderSkeleton;
-044import org.apache.log4j.Level;
-045import org.apache.log4j.Logger;
-046import org.apache.log4j.spi.LoggingEvent;
-047import org.junit.Test;
-048import org.junit.experimental.categories.Category;
-049
-050import com.google.common.collect.Lists;
-051import com.google.protobuf.BlockingRpcChannel;
-052import com.google.protobuf.BlockingService;
-053import com.google.protobuf.RpcController;
-054import com.google.protobuf.ServiceException;
-055
-056/**
-057 * Test that delayed RPCs work. Fire up three calls, the first of which should
-058 * be delayed. Check that the last two, which are undelayed, return before the
-059 * first one.
-060 */
-061@Category({RPCTests.class, MediumTests.class}) // Fails sometimes with small tests
-062public class TestDelayedRpc {
-063  private static final Log LOG = LogFactory.getLog(TestDelayedRpc.class);
-064  public static RpcServerInterface rpcServer;
-065  public static final int UNDELAYED = 0;
-066  public static final int DELAYED = 1;
-067  private static final int RPC_CLIENT_TIMEOUT = 30000;
-068
-069  @Test (timeout=60000)
-070  public void testDelayedRpcImmediateReturnValue() throws Exception {
-071    testDelayedRpc(false);
-072  }
-073
-074  @Test (timeout=60000)
-075  public void testDelayedRpcDelayedReturnValue() throws Exception {
-076    testDelayedRpc(true);
-077  }
-078
-079  private void testDelayedRpc(boolean delayReturnValue) throws Exception {
-080    LOG.info("Running testDelayedRpc delayReturnValue=" + delayReturnValue);
-081    Configuration conf = HBaseConfiguration.create();
-082    InetSocketAddress isa = new InetSocketAddress("localhost", 0);
-083    TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
-084    BlockingService service =
-085      TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
-086    rpcServer = new RpcServer(null, "testDelayedRpc",
-087        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
-088        isa,
-089        conf,
-090        new FifoRpcScheduler(conf, 1));
-091    rpcServer.start();
-092    RpcClient rpcClient = RpcClientFactory.createClient(
-093        conf, HConstants.DEFAULT_CLUSTER_ID.toString());
-094    try {
-095      InetSocketAddress address = rpcServer.getListenerAddress();
-096      if (address == null) {
-097        throw new IOException("Listener channel is closed");
-098      }
-099      BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
-100          ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
-101          User.getCurrent(), RPC_CLIENT_TIMEOUT);
-102      TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
-103        TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
-104      List<Integer> results = new ArrayList<Integer>();
-105      // Setting true sets 'delayed' on the client.
-106      TestThread th1 = new TestThread(stub, true, results);
-107      // Setting 'false' means we will return UNDELAYED as response immediately.
-108      TestThread th2 = new TestThread(stub, false, results);
-109      TestThread th3 = new TestThread(stub, false, results);
-110      th1.start();
-111      Thread.sleep(100);
-112      th2.start();
-113      Thread.sleep(200);
-114      th3.start();
-115
-116      th1.join();
-117      th2.join();
-118      th3.join();
-119
-120      // We should get the two undelayed responses first.
-121      assertEquals(UNDELAYED, results.get(0).intValue());
-122      assertEquals(UNDELAYED, results.get(1).intValue());
-123      assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :  0xDEADBEEF);
-124    } finally {
-125      rpcClient.close();
-126    }
-127  }
-128
-129  private static class ListAppender extends AppenderSkeleton {
-130    private final List<String> messages = new ArrayList<String>();
-131
-132    @Override
-133    protected void append(LoggingEvent event) {
-134      messages.add(event.getMessage().toString());
-135    }
-136
-137    @Override
-138    public void close() {
-139    }
-140
-141    @Override
-142    public boolean requiresLayout() {
-143      return false;
-144    }
-145
-146    public List<String> getMessages() {
-147      return messages;
-148    }
-149  }
-150
-151  /**
-152   * Tests that we see a WARN message in the logs.
-153   * @throws Exception
-154   */
-155  @Test (timeout=60000)
-156  public void testTooManyDelayedRpcs() throws Exception {
-157    Configuration conf = HBaseConfiguration.create();
-158    final int MAX_DELAYED_RPC = 10;
-159    conf.setInt("hbase.ipc.warn.delayedrpc.number", MAX_DELAYED_RPC);
-160    // Set up an appender to catch the "Too many delayed calls" that we expect.
-161    ListAppender listAppender = new ListAppender();
-162    Logger log = Logger.getLogger(RpcServer.class);
-163    log.addAppender(listAppender);
-164    log.setLevel(Level.WARN);
-165
-166
-167    InetSocketAddress isa = new InetSocketAddress("localhost", 0);
-168    TestDelayedImplementation instance = new TestDelayedImplementation(true);
-169    BlockingService service =
-170      TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
-171    rpcServer = new RpcServer(null, "testTooManyDelayedRpcs",
-172      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
-173        isa,
-174        conf,
-175        new FifoRpcScheduler(conf, 1));
-176    rpcServer.start();
-177    RpcClient rpcClient = RpcClientFactory.createClient(
-178        conf, HConstants.DEFAULT_CLUSTER_ID.toString());
-179    try {
-180      InetSocketAddress address = rpcServer.getListenerAddress();
-181      if (address == null) {
-182        throw new IOException("Listener channel is closed");
-183      }
-184      BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
-185          ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
-186          User.getCurrent(), RPC_CLIENT_TIMEOUT);
-187      TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
-188        TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
-189      Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
-190      for (int i = 0; i < MAX_DELAYED_RPC; i++) {
-191        threads[i] = new TestThread(stub, true, null);
-192        threads[i].start();
-193      }
-194
-195      /* No warnings till here. */
-196      assertTrue(listAppender.getMessages().isEmpty());
-197
-198      /* This should give a warning. */
-199      threads[MAX_DELAYED_RPC] = new TestThread(stub, true, null);
-200      threads[MAX_DELAYED_RPC].start();
-201
-202      for (int i = 0; i < MAX_DELAYED_RPC; i++) {
-203        threads[i].join();
-204      }
-205
-206      assertFalse(listAppender.getMessages().isEmpty());
-207      assertTrue(listAppender.getMessages().get(0).startsWith("Too many delayed calls"));
-208
-209      log.removeAppender(listAppender);
-210    } finally {
-211      rpcClient.close();
-212    }
-213  }
-214
-215  public static class TestDelayedImplementation
-216  implements TestDelayedRpcProtos.TestDelayedService.BlockingInterface {
-217    /**
-218     * Should the return value of delayed call be set at the end of the delay
-219     * or at call return.
-220     */
-221    private final boolean delayReturnValue;
-222
-223    /**
-224     * @param delayReturnValue Should the response to the delayed call be set
-225     * at the start or the end of the delay.
-226     */
-227    public TestDelayedImplementation(boolean delayReturnValue) {
-228      this.delayReturnValue = delayReturnValue;
-229    }
-230
-231    @Override
-232    public TestResponse test(final RpcController rpcController, final TestArg testArg)
-233    throws ServiceException {
-234      boolean delay = testArg.getDelay();
-235      TestResponse.Builder responseBuilder = TestResponse.newBuilder();
-236      if (!delay) {
-237        responseBuilder.setResponse(UNDELAYED);
-238        return responseBuilder.build();
-239      }
-240      final Delayable call = RpcServer.getCurrentCall();
-241      call.startDelay(delayReturnValue);
-242      new Thread() {
-243        @Override
-244        public void run() {
-245          try {
-246            Thread.sleep(500);
-247            TestResponse.Builder responseBuilder = TestResponse.newBuilder();
-248            call.endDelay(delayReturnValue ?
-249                responseBuilder.setResponse(DELAYED).build() : null);
-250          } catch (Exception e) {
-251            e.printStackTrace();
-252          }
-253        }
-254      }.start();
-255      // This value should go back to client only if the response is set
-256      // immediately at delay time.
-257      responseBuilder.setResponse(0xDEADBEEF);
-258      return responseBuilder.build();
-259    }
-260  }
-261
-262  public static class TestThread extends Thread {
-263    private final TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub;
-264    private final boolean delay;
-265    private final List<Integer> results;
-266
-267    public TestThread(TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub,
-268        boolean delay, List<Integer> results) {
-269      this.stub = stub;
-270      this.delay = delay;
-271      this.results = results;
-272    }
-273
-274    @Override
-275    public void run() {
-276      Integer result;
-277      try {
-278        result = new Integer(stub.test(null, TestArg.newBuilder().setDelay(delay).build()).
-279          getResponse());
-280      } catch (ServiceException e) {
-281        throw new RuntimeException(e);
-282      }
-283      if (results != null) {
-284        synchronized (results) {
-285          results.add(result);
-286        }
-287      }
-288    }
-289  }
-290
-291  @Test
-292  public void testEndDelayThrowing() throws IOException {
-293    Configuration conf = HBaseConfiguration.create();
-294    InetSocketAddress isa = new InetSocketAddress("localhost", 0);
-295    FaultyTestDelayedImplementation instance = new FaultyTestDelayedImplementation();
-296    BlockingService service =
-297      TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
-298    rpcServer = new RpcServer(null, "testEndDelayThrowing",
-299        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
-300        isa,
-301        conf,
-302        new FifoRpcScheduler(conf, 1));
-303    rpcServer.start();
-304    RpcClient rpcClient = RpcClientFactory.createClient(
-305        conf, HConstants.DEFAULT_CLUSTER_ID.toString());
-306    try {
-307      InetSocketAddress address = rpcServer.getListenerAddress();
-308      if (address == null) {
-309        throw new IOException("Listener channel is closed");
-310      }
-311      BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
-312          ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
-313        User.getCurrent(), 1000);
-314      TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
-315        TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
-316
-317      int result = 0xDEADBEEF;
-318
-319      try {
-320        result = stub.test(null, TestArg.newBuilder().setDelay(false).build()).getResponse();
-321      } catch (Exception e) {
-322        fail("No exception should have been thrown.");
-323      }
-324      assertEquals(result, UNDELAYED);
-325
-326      boolean caughtException = false;
-327      try {
-328        result = stub.test(null, TestArg.newBuilder().setDelay(true).build()).getResponse();
-329      } catch(Exception e) {
-330        // Exception thrown by server is enclosed in a RemoteException.
-331        if (e.getCause().getMessage().contains("java.lang.Exception: Something went wrong")) {
-332          caughtException = true;
-333        }
-334        LOG.warn("Caught exception, expected=" + caughtException);
-335      }
-336      assertTrue(caughtException);
-337    } finally {
-338      rpcClient.close();
-339    }
-340  }
-341
-342  /**
-343   * Delayed calls to this class throw an exception.
-344   */
-345  private static class FaultyTestDelayedImplementation extends TestDelayedImplementation {
-346    public FaultyTestDelayedImplementation() {
-347      super(false);
-348    }
-349
-350    @Override
-351    public TestResponse test(RpcController rpcController, TestArg arg)
-352    throws ServiceException {
-353      LOG.info("In faulty test, delay=" + arg.getDelay());
-354      if (!arg.getDelay()) return TestResponse.newBuilder().setResponse(UNDELAYED).build();
-355      Delayable call = RpcServer.getCurrentCall();
-356      call.startDelay(true);
-357      LOG.info("In faulty test, delaying");
-358      try {
-359        call.endDelayThrowing(new Exception("Something went wrong"));
-360      } catch (IOException e) {
-361        e.printStackTrace();
-362      }
-363      // Client will receive the Exception, not this value.
-364      return TestResponse.newBuilder().setResponse(DELAYED).build();
-365    }
-366  }
-367}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- - http://git-wip-us.apache.org/repos/asf/hbase-site/blob/6d411951/testdevapidocs/src-html/org/apache/hadoop/hbase/ipc/TestDelayedRpc.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/src-html/org/apache/hadoop/hbase/ipc/TestDelayedRpc.html b/testdevapidocs/src-html/org/apache/hadoop/hbase/ipc/TestDelayedRpc.html deleted file mode 100644 index c6a4fc3..0000000 --- a/testdevapidocs/src-html/org/apache/hadoop/hbase/ipc/TestDelayedRpc.html +++ /dev/null @@ -1,439 +0,0 @@ - - - -Source code - - - -
-
001/**
-002 *
-003 * Licensed to the Apache Software Foundation (ASF) under one
-004 * or more contributor license agreements.  See the NOTICE file
-005 * distributed with this work for additional information
-006 * regarding copyright ownership.  The ASF licenses this file
-007 * to you under the Apache License, Version 2.0 (the
-008 * "License"); you may not use this file except in compliance
-009 * with the License.  You may obtain a copy of the License at
-010 *
-011 *     http://www.apache.org/licenses/LICENSE-2.0
-012 *
-013 * Unless required by applicable law or agreed to in writing, software
-014 * distributed under the License is distributed on an "AS IS" BASIS,
-015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-016 * See the License for the specific language governing permissions and
-017 * limitations under the License.
-018 */
-019package org.apache.hadoop.hbase.ipc;
-020
-021import static org.junit.Assert.assertEquals;
-022import static org.junit.Assert.assertFalse;
-023import static org.junit.Assert.assertTrue;
-024import static org.junit.Assert.fail;
-025
-026import java.io.IOException;
-027import java.net.InetSocketAddress;
-028import java.util.ArrayList;
-029import java.util.List;
-030
-031import org.apache.commons.logging.Log;
-032import org.apache.commons.logging.LogFactory;
-033import org.apache.hadoop.conf.Configuration;
-034import org.apache.hadoop.hbase.HBaseConfiguration;
-035import org.apache.hadoop.hbase.HConstants;
-036import org.apache.hadoop.hbase.testclassification.MediumTests;
-037import org.apache.hadoop.hbase.testclassification.RPCTests;
-038import org.apache.hadoop.hbase.ServerName;
-039import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
-040import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg;
-041import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse;
-042import org.apache.hadoop.hbase.security.User;
-043import org.apache.log4j.AppenderSkeleton;
-044import org.apache.log4j.Level;
-045import org.apache.log4j.Logger;
-046import org.apache.log4j.spi.LoggingEvent;
-047import org.junit.Test;
-048import org.junit.experimental.categories.Category;
-049
-050import com.google.common.collect.Lists;
-051import com.google.protobuf.BlockingRpcChannel;
-052import com.google.protobuf.BlockingService;
-053import com.google.protobuf.RpcController;
-054import com.google.protobuf.ServiceException;
-055
-056/**
-057 * Test that delayed RPCs work. Fire up three calls, the first of which should
-058 * be delayed. Check that the last two, which are undelayed, return before the
-059 * first one.
-060 */
-061@Category({RPCTests.class, MediumTests.class}) // Fails sometimes with small tests
-062public class TestDelayedRpc {
-063  private static final Log LOG = LogFactory.getLog(TestDelayedRpc.class);
-064  public static RpcServerInterface rpcServer;
-065  public static final int UNDELAYED = 0;
-066  public static final int DELAYED = 1;
-067  private static final int RPC_CLIENT_TIMEOUT = 30000;
-068
-069  @Test (timeout=60000)
-070  public void testDelayedRpcImmediateReturnValue() throws Exception {
-071    testDelayedRpc(false);
-072  }
-073
-074  @Test (timeout=60000)
-075  public void testDelayedRpcDelayedReturnValue() throws Exception {
-076    testDelayedRpc(true);
-077  }
-078
-079  private void testDelayedRpc(boolean delayReturnValue) throws Exception {
-080    LOG.info("Running testDelayedRpc delayReturnValue=" + delayReturnValue);
-081    Configuration conf = HBaseConfiguration.create();
-082    InetSocketAddress isa = new InetSocketAddress("localhost", 0);
-083    TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
-084    BlockingService service =
-085      TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
-086    rpcServer = new RpcServer(null, "testDelayedRpc",
-087        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
-088        isa,
-089        conf,
-090        new FifoRpcScheduler(conf, 1));
-091    rpcServer.start();
-092    RpcClient rpcClient = RpcClientFactory.createClient(
-093        conf, HConstants.DEFAULT_CLUSTER_ID.toString());
-094    try {
-095      InetSocketAddress address = rpcServer.getListenerAddress();
-096      if (address == null) {
-097        throw new IOException("Listener channel is closed");
-098      }
-099      BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
-100          ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
-101          User.getCurrent(), RPC_CLIENT_TIMEOUT);
-102      TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
-103        TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
-104      List<Integer> results = new ArrayList<Integer>();
-105      // Setting true sets 'delayed' on the client.
-106      TestThread th1 = new TestThread(stub, true, results);
-107      // Setting 'false' means we will return UNDELAYED as response immediately.
-108      TestThread th2 = new TestThread(stub, false, results);
-109      TestThread th3 = new TestThread(stub, false, results);
-110      th1.start();
-111      Thread.sleep(100);
-112      th2.start();
-113      Thread.sleep(200);
-114      th3.start();
-115
-116      th1.join();
-117      th2.join();
-118      th3.join();
-119
-120      // We should get the two undelayed responses first.
-121      assertEquals(UNDELAYED, results.get(0).intValue());
-122      assertEquals(UNDELAYED, results.get(1).intValue());
-123      assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :  0xDEADBEEF);
-124    } finally {
-125      rpcClient.close();
-126    }
-127  }
-128
-129  private static class ListAppender extends AppenderSkeleton {
-130    private final List<String> messages = new ArrayList<String>();
-131
-132    @Override
-133    protected void append(LoggingEvent event) {
-134      messages.add(event.getMessage().toString());
-135    }
-136
-137    @Override
-138    public void close() {
-139    }
-140
-141    @Override
-142    public boolean requiresLayout() {
-143      return false;
-144    }
-145
-146    public List<String> getMessages() {
-147      return messages;
-148    }
-149  }
-150
-151  /**
-152   * Tests that we see a WARN message in the logs.
-153   * @throws Exception
-154   */
-155  @Test (timeout=60000)
-156  public void testTooManyDelayedRpcs() throws Exception {
-157    Configuration conf = HBaseConfiguration.create();
-158    final int MAX_DELAYED_RPC = 10;
-159    conf.setInt("hbase.ipc.warn.delayedrpc.number", MAX_DELAYED_RPC);
-160    // Set up an appender to catch the "Too many delayed calls" that we expect.
-161    ListAppender listAppender = new ListAppender();
-162    Logger log = Logger.getLogger(RpcServer.class);
-163    log.addAppender(listAppender);
-164    log.setLevel(Level.WARN);
-165
-166
-167    InetSocketAddress isa = new InetSocketAddress("localhost", 0);
-168    TestDelayedImplementation instance = new TestDelayedImplementation(true);
-169    BlockingService service =
-170      TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
-171    rpcServer = new RpcServer(null, "testTooManyDelayedRpcs",
-172      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
-173        isa,
-174        conf,
-175        new FifoRpcScheduler(conf, 1));
-176    rpcServer.start();
-177    RpcClient rpcClient = RpcClientFactory.createClient(
-178        conf, HConstants.DEFAULT_CLUSTER_ID.toString());
-179    try {
-180      InetSocketAddress address = rpcServer.getListenerAddress();
-181      if (address == null) {
-182        throw new IOException("Listener channel is closed");
-183      }
-184      BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
-185          ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
-186          User.getCurrent(), RPC_CLIENT_TIMEOUT);
-187      TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
-188        TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
-189      Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
-190      for (int i = 0; i < MAX_DELAYED_RPC; i++) {
-191        threads[i] = new TestThread(stub, true, null);
-192        threads[i].start();
-193      }
-194
-195      /* No warnings till here. */
-196      assertTrue(listAppender.getMessages().isEmpty());
-197
-198      /* This should give a warning. */
-199      threads[MAX_DELAYED_RPC] = new TestThread(stub, true, null);
-200      threads[MAX_DELAYED_RPC].start();
-201
-202      for (int i = 0; i < MAX_DELAYED_RPC; i++) {
-203        threads[i].join();
-204      }
-205
-206      assertFalse(listAppender.getMessages().isEmpty());
-207      assertTrue(listAppender.getMessages().get(0).startsWith("Too many delayed calls"));
-208
-209      log.removeAppender(listAppender);
-210    } finally {
-211      rpcClient.close();
-212    }
-213  }
-214
-215  public static class TestDelayedImplementation
-216  implements TestDelayedRpcProtos.TestDelayedService.BlockingInterface {
-217    /**
-218     * Should the return value of delayed call be set at the end of the delay
-219     * or at call return.
-220     */
-221    private final boolean delayReturnValue;
-222
-223    /**
-224     * @param delayReturnValue Should the response to the delayed call be set
-225     * at the start or the end of the delay.
-226     */
-227    public TestDelayedImplementation(boolean delayReturnValue) {
-228      this.delayReturnValue = delayReturnValue;
-229    }
-230
-231    @Override
-232    public TestResponse test(final RpcController rpcController, final TestArg testArg)
-233    throws ServiceException {
-234      boolean delay = testArg.getDelay();
-235      TestResponse.Builder responseBuilder = TestResponse.newBuilder();
-236      if (!delay) {
-237        responseBuilder.setResponse(UNDELAYED);
-238        return responseBuilder.build();
-239      }
-240      final Delayable call = RpcServer.getCurrentCall();
-241      call.startDelay(delayReturnValue);
-242      new Thread() {
-243        @Override
-244        public void run() {
-245          try {
-246            Thread.sleep(500);
-247            TestResponse.Builder responseBuilder = TestResponse.newBuilder();
-248            call.endDelay(delayReturnValue ?
-249                responseBuilder.setResponse(DELAYED).build() : null);
-250          } catch (Exception e) {
-251            e.printStackTrace();
-252          }
-253        }
-254      }.start();
-255      // This value should go back to client only if the response is set
-256      // immediately at delay time.
-257      responseBuilder.setResponse(0xDEADBEEF);
-258      return responseBuilder.build();
-259    }
-260  }
-261
-262  public static class TestThread extends Thread {
-263    private final TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub;
-264    private final boolean delay;
-265    private final List<Integer> results;
-266
-267    public TestThread(TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub,
-268        boolean delay, List<Integer> results) {
-269      this.stub = stub;
-270      this.delay = delay;
-271      this.results = results;
-272    }
-273
-274    @Override
-275    public void run() {
-276      Integer result;
-277      try {
-278        result = new Integer(stub.test(null, TestArg.newBuilder().setDelay(delay).build()).
-279          getResponse());
-280      } catch (ServiceException e) {
-281        throw new RuntimeException(e);
-282      }
-283      if (results != null) {
-284        synchronized (results) {
-285          results.add(result);
-286        }
-287      }
-288    }
-289  }
-290
-291  @Test
-292  public void testEndDelayThrowing() throws IOException {
-293    Configuration conf = HBaseConfiguration.create();
-294    InetSocketAddress isa = new InetSocketAddress("localhost", 0);
-295    FaultyTestDelayedImplementation instance = new FaultyTestDelayedImplementation();
-296    BlockingService service =
-297      TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
-298    rpcServer = new RpcServer(null, "testEndDelayThrowing",
-299        Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
-300        isa,
-301        conf,
-302        new FifoRpcScheduler(conf, 1));
-303    rpcServer.start();
-304    RpcClient rpcClient = RpcClientFactory.createClient(
-305        conf, HConstants.DEFAULT_CLUSTER_ID.toString());
-306    try {
-307      InetSocketAddress address = rpcServer.getListenerAddress();
-308      if (address == null) {
-309        throw new IOException("Listener channel is closed");
-310      }
-311      BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
-312          ServerName.valueOf(address.getHostName(),address.getPort(), System.currentTimeMillis()),
-313        User.getCurrent(), 1000);
-314      TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub =
-315        TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
-316
-317      int result = 0xDEADBEEF;
-318
-319      try {
-320        result = stub.test(null, TestArg.newBuilder().setDelay(false).build()).getResponse();
-321      } catch (Exception e) {
-322        fail("No exception should have been thrown.");
-323      }
-324      assertEquals(result, UNDELAYED);
-325
-326      boolean caughtException = false;
-327      try {
-328        result = stub.test(null, TestArg.newBuilder().setDelay(true).build()).getResponse();
-329      } catch(Exception e) {
-330        // Exception thrown by server is enclosed in a RemoteException.
-331        if (e.getCause().getMessage().contains("java.lang.Exception: Something went wrong")) {
-332          caughtException = true;
-333        }
-334        LOG.warn("Caught exception, expected=" + caughtException);
-335      }
-336      assertTrue(caughtException);
-337    } finally {
-338      rpcClient.close();
-339    }
-340  }
-341
-342  /**
-343   * Delayed calls to this class throw an exception.
-344   */
-345  private static class FaultyTestDelayedImplementation extends TestDelayedImplementation {
-346    public FaultyTestDelayedImplementation() {
-347      super(false);
-348    }
-349
-350    @Override
-351    public TestResponse test(RpcController rpcController, TestArg arg)
-352    throws ServiceException {
-353      LOG.info("In faulty test, delay=" + arg.getDelay());
-354      if (!arg.getDelay()) return TestResponse.newBuilder().setResponse(UNDELAYED).build();
-355      Delayable call = RpcServer.getCurrentCall();
-356      call.startDelay(true);
-357      LOG.info("In faulty test, delaying");
-358      try {
-359        call.endDelayThrowing(new Exception("Something went wrong"));
-360      } catch (IOException e) {
-361        e.printStackTrace();
-362      }
-363      // Client will receive the Exception, not this value.
-364      return TestResponse.newBuilder().setResponse(DELAYED).build();
-365    }
-366  }
-367}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- -