Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9A3B6200C7F for ; Tue, 9 May 2017 17:00:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 98D2E160BCD; Tue, 9 May 2017 15:00:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4E543160BC3 for ; Tue, 9 May 2017 17:00:26 +0200 (CEST) Received: (qmail 75662 invoked by uid 500); 9 May 2017 15:00:22 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 73868 invoked by uid 99); 9 May 2017 15:00:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 May 2017 15:00:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4B867F1756; Tue, 9 May 2017 15:00:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Tue, 09 May 2017 15:00:33 -0000 Message-Id: <432be6fdec234eb1b1eed227507fcb89@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [15/51] [partial] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Tue, 09 May 2017 15:00:28 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/7ef4c5a9/devapidocs/src-html/org/apache/hadoop/hbase/ipc/SimpleRpcServer.Call.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/SimpleRpcServer.Call.html b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/SimpleRpcServer.Call.html deleted file mode 100644 index 2987e7b..0000000 --- a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/SimpleRpcServer.Call.html +++ /dev/null @@ -1,1500 +0,0 @@ - - - -Source code - - - -
-
001/**
-002 * Licensed to the Apache Software Foundation (ASF) under one
-003 * or more contributor license agreements.  See the NOTICE file
-004 * distributed with this work for additional information
-005 * regarding copyright ownership.  The ASF licenses this file
-006 * to you under the Apache License, Version 2.0 (the
-007 * "License"); you may not use this file except in compliance
-008 * with the License.  You may obtain a copy of the License at
-009 *
-010 *     http://www.apache.org/licenses/LICENSE-2.0
-011 *
-012 * Unless required by applicable law or agreed to in writing, software
-013 * distributed under the License is distributed on an "AS IS" BASIS,
-014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-015 * See the License for the specific language governing permissions and
-016 * limitations under the License.
-017 */
-018
-019package org.apache.hadoop.hbase.ipc;
-020
-021import java.io.IOException;
-022import java.io.InputStream;
-023import java.net.BindException;
-024import java.net.InetAddress;
-025import java.net.InetSocketAddress;
-026import java.net.ServerSocket;
-027import java.net.Socket;
-028import java.net.SocketException;
-029import java.net.UnknownHostException;
-030import java.nio.ByteBuffer;
-031import java.nio.channels.CancelledKeyException;
-032import java.nio.channels.ClosedChannelException;
-033import java.nio.channels.GatheringByteChannel;
-034import java.nio.channels.ReadableByteChannel;
-035import java.nio.channels.SelectionKey;
-036import java.nio.channels.Selector;
-037import java.nio.channels.ServerSocketChannel;
-038import java.nio.channels.SocketChannel;
-039import java.util.ArrayList;
-040import java.util.Arrays;
-041import java.util.Collections;
-042import java.util.Iterator;
-043import java.util.List;
-044import java.util.Set;
-045import java.util.Timer;
-046import java.util.TimerTask;
-047import java.util.concurrent.ConcurrentHashMap;
-048import java.util.concurrent.ConcurrentLinkedDeque;
-049import java.util.concurrent.ExecutorService;
-050import java.util.concurrent.Executors;
-051import java.util.concurrent.LinkedBlockingQueue;
-052import java.util.concurrent.atomic.AtomicInteger;
-053import java.util.concurrent.atomic.LongAdder;
-054import java.util.concurrent.locks.Lock;
-055import java.util.concurrent.locks.ReentrantLock;
-056
-057import org.apache.hadoop.conf.Configuration;
-058import org.apache.hadoop.hbase.CellScanner;
-059import org.apache.hadoop.hbase.DoNotRetryIOException;
-060import org.apache.hadoop.hbase.HBaseIOException;
-061import org.apache.hadoop.hbase.HConstants;
-062import org.apache.hadoop.hbase.Server;
-063import org.apache.hadoop.hbase.classification.InterfaceAudience;
-064import org.apache.hadoop.hbase.classification.InterfaceStability;
-065import org.apache.hadoop.hbase.client.VersionInfoUtil;
-066import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
-067import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-068import org.apache.hadoop.hbase.nio.ByteBuff;
-069import org.apache.hadoop.hbase.nio.SingleByteBuff;
-070import org.apache.hadoop.hbase.security.AccessDeniedException;
-071import org.apache.hadoop.hbase.security.AuthMethod;
-072import org.apache.hadoop.hbase.security.HBasePolicyProvider;
-073import org.apache.hadoop.hbase.security.SaslStatus;
-074import org.apache.hadoop.hbase.security.SaslUtil;
-075import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
-076import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
-077import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
-078import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-079import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-080import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-081import org.apache.hadoop.hbase.util.Bytes;
-082import org.apache.hadoop.hbase.util.Pair;
-083import org.apache.hadoop.hbase.util.Threads;
-084import org.apache.hadoop.io.IOUtils;
-085import org.apache.hadoop.io.IntWritable;
-086import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-087import org.apache.hadoop.util.StringUtils;
-088import org.apache.htrace.TraceInfo;
-089
-090import com.google.common.util.concurrent.ThreadFactoryBuilder;
-091
-092/**
-093 * The RPC server with native java NIO implementation deriving from Hadoop to
-094 * host protobuf described Services. It's the original one before HBASE-17262,
-095 * and the default RPC server for now.
-096 *
-097 * An RpcServer instance has a Listener that hosts the socket.  Listener has fixed number
-098 * of Readers in an ExecutorPool, 10 by default.  The Listener does an accept and then
-099 * round robin a Reader is chosen to do the read.  The reader is registered on Selector.  Read does
-100 * total read off the channel and the parse from which it makes a Call.  The call is wrapped in a
-101 * CallRunner and passed to the scheduler to be run.  Reader goes back to see if more to be done
-102 * and loops till done.
-103 *
-104 * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
-105 * has given the queues into which calls (i.e. CallRunner instances) are inserted.  Handlers run
-106 * taking from the queue.  They run the CallRunner#run method on each item gotten from queue
-107 * and keep taking while the server is up.
-108 *
-109 * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
-110 * queue for Responder to pull from and return result to client.
-111 *
-112 * @see BlockingRpcClient
-113 */
-114@InterfaceAudience.Private
-115@InterfaceStability.Evolving
-116public class SimpleRpcServer extends RpcServer {
-117
-118  protected int port;                             // port we listen on
-119  protected InetSocketAddress address;            // inet address we listen on
-120  private int readThreads;                        // number of read threads
-121
-122  protected int socketSendBufferSize;
-123  protected final long purgeTimeout;    // in milliseconds
-124
-125  // maintains the set of client connections and handles idle timeouts
-126  private ConnectionManager connectionManager;
-127  private Listener listener = null;
-128  protected Responder responder = null;
-129
-130  /**
-131   * Datastructure that holds all necessary to a method invocation and then afterward, carries
-132   * the result.
-133   */
-134  @InterfaceStability.Evolving
-135  public class Call extends RpcServer.Call {
-136
-137    protected Responder responder;
-138
-139    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
-140        justification="Can't figure why this complaint is happening... see below")
-141    Call(int id, final BlockingService service, final MethodDescriptor md,
-142        RequestHeader header, Message param, CellScanner cellScanner,
-143        RpcServer.Connection connection, long size, TraceInfo tinfo,
-144        final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup,
-145        Responder responder) {
-146      super(id, service, md, header, param, cellScanner, connection, size,
-147          tinfo, remoteAddress, timeout, reqCleanup);
-148      this.responder = responder;
-149    }
-150
-151    /**
-152     * Call is done. Execution happened and we returned results to client. It is now safe to
-153     * cleanup.
-154     */
-155    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
-156        justification="Presume the lock on processing request held by caller is protection enough")
-157    @Override
-158    void done() {
-159      super.done();
-160      this.getConnection().decRpcCount(); // Say that we're done with this call.
-161    }
-162
-163    @Override
-164    public long disconnectSince() {
-165      if (!getConnection().isConnectionOpen()) {
-166        return System.currentTimeMillis() - timestamp;
-167      } else {
-168        return -1L;
-169      }
-170    }
-171
-172    @Override
-173    public synchronized void sendResponseIfReady() throws IOException {
-174      // set param null to reduce memory pressure
-175      this.param = null;
-176      this.responder.doRespond(this);
-177    }
-178
-179    Connection getConnection() {
-180      return (Connection) this.connection;
-181    }
-182
-183  }
-184
-185  /** Listens on the socket. Creates jobs for the handler threads*/
-186  private class Listener extends Thread {
-187
-188    private ServerSocketChannel acceptChannel = null; //the accept channel
-189    private Selector selector = null; //the selector that we use for the server
-190    private Reader[] readers = null;
-191    private int currentReader = 0;
-192    private final int readerPendingConnectionQueueLength;
-193
-194    private ExecutorService readPool;
-195
-196    public Listener(final String name) throws IOException {
-197      super(name);
-198      // The backlog of requests that we will have the serversocket carry.
-199      int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
-200      readerPendingConnectionQueueLength =
-201          conf.getInt("hbase.ipc.server.read.connection-queue.size", 100);
-202      // Create a new server socket and set to non blocking mode
-203      acceptChannel = ServerSocketChannel.open();
-204      acceptChannel.configureBlocking(false);
-205
-206      // Bind the server socket to the binding addrees (can be different from the default interface)
-207      bind(acceptChannel.socket(), bindAddress, backlogLength);
-208      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
-209      address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
-210      // create a selector;
-211      selector = Selector.open();
-212
-213      readers = new Reader[readThreads];
-214      // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it
-215      // has an advantage in that it is easy to shutdown the pool.
-216      readPool = Executors.newFixedThreadPool(readThreads,
-217        new ThreadFactoryBuilder().setNameFormat(
-218          "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
-219          ",port=" + port).setDaemon(true)
-220        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
-221      for (int i = 0; i < readThreads; ++i) {
-222        Reader reader = new Reader();
-223        readers[i] = reader;
-224        readPool.execute(reader);
-225      }
-226      LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
-227
-228      // Register accepts on the server socket with the selector.
-229      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
-230      this.setName("RpcServer.listener,port=" + port);
-231      this.setDaemon(true);
-232    }
-233
-234
-235    private class Reader implements Runnable {
-236      final private LinkedBlockingQueue<Connection> pendingConnections;
-237      private final Selector readSelector;
-238
-239      Reader() throws IOException {
-240        this.pendingConnections = new LinkedBlockingQueue<>(readerPendingConnectionQueueLength);
-241        this.readSelector = Selector.open();
-242      }
-243
-244      @Override
-245      public void run() {
-246        try {
-247          doRunLoop();
-248        } finally {
-249          try {
-250            readSelector.close();
-251          } catch (IOException ioe) {
-252            LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
-253          }
-254        }
-255      }
-256
-257      private synchronized void doRunLoop() {
-258        while (running) {
-259          try {
-260            // Consume as many connections as currently queued to avoid
-261            // unbridled acceptance of connections that starves the select
-262            int size = pendingConnections.size();
-263            for (int i=size; i>0; i--) {
-264              Connection conn = pendingConnections.take();
-265              conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
-266            }
-267            readSelector.select();
-268            Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
-269            while (iter.hasNext()) {
-270              SelectionKey key = iter.next();
-271              iter.remove();
-272              if (key.isValid()) {
-273                if (key.isReadable()) {
-274                  doRead(key);
-275                }
-276              }
-277              key = null;
-278            }
-279          } catch (InterruptedException e) {
-280            if (running) {                      // unexpected -- log it
-281              LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
-282            }
-283          } catch (CancelledKeyException e) {
-284            LOG.error(getName() + ": CancelledKeyException in Reader", e);
-285          } catch (IOException ex) {
-286            LOG.info(getName() + ": IOException in Reader", ex);
-287          }
-288        }
-289      }
-290
-291      /**
-292       * Updating the readSelector while it's being used is not thread-safe,
-293       * so the connection must be queued.  The reader will drain the queue
-294       * and update its readSelector before performing the next select
-295       */
-296      public void addConnection(Connection conn) throws IOException {
-297        pendingConnections.add(conn);
-298        readSelector.wakeup();
-299      }
-300    }
-301
-302    @Override
-303    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
-304      justification="selector access is not synchronized; seems fine but concerned changing " +
-305        "it will have per impact")
-306    public void run() {
-307      LOG.info(getName() + ": starting");
-308      connectionManager.startIdleScan();
-309      while (running) {
-310        SelectionKey key = null;
-311        try {
-312          selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
-313          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
-314          while (iter.hasNext()) {
-315            key = iter.next();
-316            iter.remove();
-317            try {
-318              if (key.isValid()) {
-319                if (key.isAcceptable())
-320                  doAccept(key);
-321              }
-322            } catch (IOException ignored) {
-323              if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
-324            }
-325            key = null;
-326          }
-327        } catch (OutOfMemoryError e) {
-328          if (errorHandler != null) {
-329            if (errorHandler.checkOOME(e)) {
-330              LOG.info(getName() + ": exiting on OutOfMemoryError");
-331              closeCurrentConnection(key, e);
-332              connectionManager.closeIdle(true);
-333              return;
-334            }
-335          } else {
-336            // we can run out of memory if we have too many threads
-337            // log the event and sleep for a minute and give
-338            // some thread(s) a chance to finish
-339            LOG.warn(getName() + ": OutOfMemoryError in server select", e);
-340            closeCurrentConnection(key, e);
-341            connectionManager.closeIdle(true);
-342            try {
-343              Thread.sleep(60000);
-344            } catch (InterruptedException ex) {
-345              LOG.debug("Interrupted while sleeping");
-346            }
-347          }
-348        } catch (Exception e) {
-349          closeCurrentConnection(key, e);
-350        }
-351      }
-352      LOG.info(getName() + ": stopping");
-353      synchronized (this) {
-354        try {
-355          acceptChannel.close();
-356          selector.close();
-357        } catch (IOException ignored) {
-358          if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
-359        }
-360
-361        selector= null;
-362        acceptChannel= null;
-363
-364        // close all connections
-365        connectionManager.stopIdleScan();
-366        connectionManager.closeAll();
-367      }
-368    }
-369
-370    private void closeCurrentConnection(SelectionKey key, Throwable e) {
-371      if (key != null) {
-372        Connection c = (Connection)key.attachment();
-373        if (c != null) {
-374          closeConnection(c);
-375          key.attach(null);
-376        }
-377      }
-378    }
-379
-380    InetSocketAddress getAddress() {
-381      return address;
-382    }
-383
-384    void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
-385      ServerSocketChannel server = (ServerSocketChannel) key.channel();
-386      SocketChannel channel;
-387      while ((channel = server.accept()) != null) {
-388        channel.configureBlocking(false);
-389        channel.socket().setTcpNoDelay(tcpNoDelay);
-390        channel.socket().setKeepAlive(tcpKeepAlive);
-391        Reader reader = getReader();
-392        Connection c = connectionManager.register(channel);
-393        // If the connectionManager can't take it, close the connection.
-394        if (c == null) {
-395          if (channel.isOpen()) {
-396            IOUtils.cleanup(null, channel);
-397          }
-398          continue;
-399        }
-400        key.attach(c);  // so closeCurrentConnection can get the object
-401        reader.addConnection(c);
-402      }
-403    }
-404
-405    void doRead(SelectionKey key) throws InterruptedException {
-406      int count;
-407      Connection c = (Connection) key.attachment();
-408      if (c == null) {
-409        return;
-410      }
-411      c.setLastContact(System.currentTimeMillis());
-412      try {
-413        count = c.readAndProcess();
-414      } catch (InterruptedException ieo) {
-415        LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
-416        throw ieo;
-417      } catch (Exception e) {
-418        if (LOG.isDebugEnabled()) {
-419          LOG.debug(getName() + ": Caught exception while reading:", e);
-420        }
-421        count = -1; //so that the (count < 0) block is executed
-422      }
-423      if (count < 0) {
-424        closeConnection(c);
-425        c = null;
-426      } else {
-427        c.setLastContact(System.currentTimeMillis());
-428      }
-429    }
-430
-431    synchronized void doStop() {
-432      if (selector != null) {
-433        selector.wakeup();
-434        Thread.yield();
-435      }
-436      if (acceptChannel != null) {
-437        try {
-438          acceptChannel.socket().close();
-439        } catch (IOException e) {
-440          LOG.info(getName() + ": exception in closing listener socket. " + e);
-441        }
-442      }
-443      readPool.shutdownNow();
-444    }
-445
-446    // The method that will return the next reader to work with
-447    // Simplistic implementation of round robin for now
-448    Reader getReader() {
-449      currentReader = (currentReader + 1) % readers.length;
-450      return readers[currentReader];
-451    }
-452  }
-453
-454  // Sends responses of RPC back to clients.
-455  protected class Responder extends Thread {
-456    private final Selector writeSelector;
-457    private final Set<Connection> writingCons =
-458        Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
-459
-460    Responder() throws IOException {
-461      this.setName("RpcServer.responder");
-462      this.setDaemon(true);
-463      this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
-464      writeSelector = Selector.open(); // create a selector
-465    }
-466
-467    @Override
-468    public void run() {
-469      LOG.debug(getName() + ": starting");
-470      try {
-471        doRunLoop();
-472      } finally {
-473        LOG.info(getName() + ": stopping");
-474        try {
-475          writeSelector.close();
-476        } catch (IOException ioe) {
-477          LOG.error(getName() + ": couldn't close write selector", ioe);
-478        }
-479      }
-480    }
-481
-482    /**
-483     * Take the list of the connections that want to write, and register them
-484     * in the selector.
-485     */
-486    private void registerWrites() {
-487      Iterator<Connection> it = writingCons.iterator();
-488      while (it.hasNext()) {
-489        Connection c = it.next();
-490        it.remove();
-491        SelectionKey sk = c.channel.keyFor(writeSelector);
-492        try {
-493          if (sk == null) {
-494            try {
-495              c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
-496            } catch (ClosedChannelException e) {
-497              // ignore: the client went away.
-498              if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
-499            }
-500          } else {
-501            sk.interestOps(SelectionKey.OP_WRITE);
-502          }
-503        } catch (CancelledKeyException e) {
-504          // ignore: the client went away.
-505          if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
-506        }
-507      }
-508    }
-509
-510    /**
-511     * Add a connection to the list that want to write,
-512     */
-513    public void registerForWrite(Connection c) {
-514      if (writingCons.add(c)) {
-515        writeSelector.wakeup();
-516      }
-517    }
-518
-519    private void doRunLoop() {
-520      long lastPurgeTime = 0;   // last check for old calls.
-521      while (running) {
-522        try {
-523          registerWrites();
-524          int keyCt = writeSelector.select(purgeTimeout);
-525          if (keyCt == 0) {
-526            continue;
-527          }
-528
-529          Set<SelectionKey> keys = writeSelector.selectedKeys();
-530          Iterator<SelectionKey> iter = keys.iterator();
-531          while (iter.hasNext()) {
-532            SelectionKey key = iter.next();
-533            iter.remove();
-534            try {
-535              if (key.isValid() && key.isWritable()) {
-536                doAsyncWrite(key);
-537              }
-538            } catch (IOException e) {
-539              LOG.debug(getName() + ": asyncWrite", e);
-540            }
-541          }
-542
-543          lastPurgeTime = purge(lastPurgeTime);
-544
-545        } catch (OutOfMemoryError e) {
-546          if (errorHandler != null) {
-547            if (errorHandler.checkOOME(e)) {
-548              LOG.info(getName() + ": exiting on OutOfMemoryError");
-549              return;
-550            }
-551          } else {
-552            //
-553            // we can run out of memory if we have too many threads
-554            // log the event and sleep for a minute and give
-555            // some thread(s) a chance to finish
-556            //
-557            LOG.warn(getName() + ": OutOfMemoryError in server select", e);
-558            try {
-559              Thread.sleep(60000);
-560            } catch (InterruptedException ex) {
-561              LOG.debug("Interrupted while sleeping");
-562              return;
-563            }
-564          }
-565        } catch (Exception e) {
-566          LOG.warn(getName() + ": exception in Responder " +
-567              StringUtils.stringifyException(e), e);
-568        }
-569      }
-570      LOG.info(getName() + ": stopped");
-571    }
-572
-573    /**
-574     * If there were some calls that have not been sent out for a
-575     * long time, we close the connection.
-576     * @return the time of the purge.
-577     */
-578    private long purge(long lastPurgeTime) {
-579      long now = System.currentTimeMillis();
-580      if (now < lastPurgeTime + purgeTimeout) {
-581        return lastPurgeTime;
-582      }
-583
-584      ArrayList<Connection> conWithOldCalls = new ArrayList<>();
-585      // get the list of channels from list of keys.
-586      synchronized (writeSelector.keys()) {
-587        for (SelectionKey key : writeSelector.keys()) {
-588          Connection connection = (Connection) key.attachment();
-589          if (connection == null) {
-590            throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
-591          }
-592          Call call = connection.responseQueue.peekFirst();
-593          if (call != null && now > call.timestamp + purgeTimeout) {
-594            conWithOldCalls.add(call.getConnection());
-595          }
-596        }
-597      }
-598
-599      // Seems safer to close the connection outside of the synchronized loop...
-600      for (Connection connection : conWithOldCalls) {
-601        closeConnection(connection);
-602      }
-603
-604      return now;
-605    }
-606
-607    private void doAsyncWrite(SelectionKey key) throws IOException {
-608      Connection connection = (Connection) key.attachment();
-609      if (connection == null) {
-610        throw new IOException("doAsyncWrite: no connection");
-611      }
-612      if (key.channel() != connection.channel) {
-613        throw new IOException("doAsyncWrite: bad channel");
-614      }
-615
-616      if (processAllResponses(connection)) {
-617        try {
-618          // We wrote everything, so we don't need to be told when the socket is ready for
-619          //  write anymore.
-620         key.interestOps(0);
-621        } catch (CancelledKeyException e) {
-622          /* The Listener/reader might have closed the socket.
-623           * We don't explicitly cancel the key, so not sure if this will
-624           * ever fire.
-625           * This warning could be removed.
-626           */
-627          LOG.warn("Exception while changing ops : " + e);
-628        }
-629      }
-630    }
-631
-632    /**
-633     * Process the response for this call. You need to have the lock on
-634     * {@link org.apache.hadoop.hbase.ipc.SimpleRpcServer.Connection#responseWriteLock}
-635     *
-636     * @param call the call
-637     * @return true if we proceed the call fully, false otherwise.
-638     * @throws IOException
-639     */
-640    private boolean processResponse(final Call call) throws IOException {
-641      boolean error = true;
-642      try {
-643        // Send as much data as we can in the non-blocking fashion
-644        long numBytes = channelWrite(call.getConnection().channel,
-645            call.response);
-646        if (numBytes < 0) {
-647          throw new HBaseIOException("Error writing on the socket " +
-648            "for the call:" + call.toShortString());
-649        }
-650        error = false;
-651      } finally {
-652        if (error) {
-653          LOG.debug(getName() + call.toShortString() + ": output error -- closing");
-654          // We will be closing this connection itself. Mark this call as done so that all the
-655          // buffer(s) it got from pool can get released
-656          call.done();
-657          closeConnection(call.getConnection());
-658        }
-659      }
-660
-661      if (!call.response.hasRemaining()) {
-662        call.done();
-663        return true;
-664      } else {
-665        return false; // Socket can't take more, we will have to come back.
-666      }
-667    }
-668
-669    /**
-670     * Process all the responses for this connection
-671     *
-672     * @return true if all the calls were processed or that someone else is doing it.
-673     * false if there * is still some work to do. In this case, we expect the caller to
-674     * delay us.
-675     * @throws IOException
-676     */
-677    private boolean processAllResponses(final Connection connection) throws IOException {
-678      // We want only one writer on the channel for a connection at a time.
-679      connection.responseWriteLock.lock();
-680      try {
-681        for (int i = 0; i < 20; i++) {
-682          // protection if some handlers manage to need all the responder
-683          Call call = connection.responseQueue.pollFirst();
-684          if (call == null) {
-685            return true;
-686          }
-687          if (!processResponse(call)) {
-688            connection.responseQueue.addFirst(call);
-689            return false;
-690          }
-691        }
-692      } finally {
-693        connection.responseWriteLock.unlock();
-694      }
-695
-696      return connection.responseQueue.isEmpty();
-697    }
-698
-699    //
-700    // Enqueue a response from the application.
-701    //
-702    void doRespond(Call call) throws IOException {
-703      boolean added = false;
-704
-705      // If there is already a write in progress, we don't wait. This allows to free the handlers
-706      //  immediately for other tasks.
-707      if (call.getConnection().responseQueue.isEmpty()
-708          && call.getConnection().responseWriteLock.tryLock()) {
-709        try {
-710          if (call.getConnection().responseQueue.isEmpty()) {
-711            // If we're alone, we can try to do a direct call to the socket. It's
-712            //  an optimisation to save on context switches and data transfer between cores..
-713            if (processResponse(call)) {
-714              return; // we're done.
-715            }
-716            // Too big to fit, putting ahead.
-717            call.getConnection().responseQueue.addFirst(call);
-718            added = true; // We will register to the selector later, outside of the lock.
-719          }
-720        } finally {
-721          call.getConnection().responseWriteLock.unlock();
-722        }
-723      }
-724
-725      if (!added) {
-726        call.getConnection().responseQueue.addLast(call);
-727      }
-728      call.responder.registerForWrite(call.getConnection());
-729
-730      // set the serve time when the response has to be sent later
-731      call.timestamp = System.currentTimeMillis();
-732    }
-733  }
-734
-735  /** Reads calls from a connection and queues them for handling. */
-736  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
-737      value="VO_VOLATILE_INCREMENT",
-738      justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
-739  public class Connection extends RpcServer.Connection {
-740
-741    protected SocketChannel channel;
-742    private ByteBuff data;
-743    private ByteBuffer dataLengthBuffer;
-744    protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<>();
-745    private final Lock responseWriteLock = new ReentrantLock();
-746    private LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs
-747    private long lastContact;
-748    protected Socket socket;
-749
-750    public Connection(SocketChannel channel, long lastContact) {
-751      super();
-752      this.channel = channel;
-753      this.lastContact = lastContact;
-754      this.data = null;
-755      this.dataLengthBuffer = ByteBuffer.allocate(4);
-756      this.socket = channel.socket();
-757      this.addr = socket.getInetAddress();
-758      if (addr == null) {
-759        this.hostAddress = "*Unknown*";
-760      } else {
-761        this.hostAddress = addr.getHostAddress();
-762      }
-763      this.remotePort = socket.getPort();
-764      if (socketSendBufferSize != 0) {
-765        try {
-766          socket.setSendBufferSize(socketSendBufferSize);
-767        } catch (IOException e) {
-768          LOG.warn("Connection: unable to set socket send buffer size to " +
-769                   socketSendBufferSize);
-770        }
-771      }
-772      this.saslCall = new Call(SASL_CALLID, null, null, null, null, null, this,
-773          0, null, null, 0, null, responder);
-774      this.setConnectionHeaderResponseCall = new Call(
-775          CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null,
-776          this, 0, null, null, 0, null, responder);
-777      this.authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null,
-778          null, null, null, this, 0, null, null, 0, null, responder);
-779    }
-780
-781    public void setLastContact(long lastContact) {
-782      this.lastContact = lastContact;
-783    }
-784
-785    public long getLastContact() {
-786      return lastContact;
-787    }
-788
-789    /* Return true if the connection has no outstanding rpc */
-790    private boolean isIdle() {
-791      return rpcCount.sum() == 0;
-792    }
-793
-794    /* Decrement the outstanding RPC count */
-795    protected void decRpcCount() {
-796      rpcCount.decrement();
-797    }
-798
-799    /* Increment the outstanding RPC count */
-800    protected void incRpcCount() {
-801      rpcCount.increment();
-802    }
-803
-804    private int readPreamble() throws IOException {
-805      int count;
-806      // Check for 'HBas' magic.
-807      this.dataLengthBuffer.flip();
-808      if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
-809        return doBadPreambleHandling("Expected HEADER=" +
-810            Bytes.toStringBinary(HConstants.RPC_HEADER) +
-811            " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
-812            " from " + toString());
-813      }
-814      // Now read the next two bytes, the version and the auth to use.
-815      ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
-816      count = channelRead(channel, versionAndAuthBytes);
-817      if (count < 0 || versionAndAuthBytes.remaining() > 0) {
-818        return count;
-819      }
-820      int version = versionAndAuthBytes.get(0);
-821      byte authbyte = versionAndAuthBytes.get(1);
-822      this.authMethod = AuthMethod.valueOf(authbyte);
-823      if (version != CURRENT_VERSION) {
-824        String msg = getFatalConnectionString(version, authbyte);
-825        return doBadPreambleHandling(msg, new WrongVersionException(msg));
-826      }
-827      if (authMethod == null) {
-828        String msg = getFatalConnectionString(version, authbyte);
-829        return doBadPreambleHandling(msg, new BadAuthException(msg));
-830      }
-831      if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
-832        if (allowFallbackToSimpleAuth) {
-833          metrics.authenticationFallback();
-834          authenticatedWithFallback = true;
-835        } else {
-836          AccessDeniedException ae = new AccessDeniedException("Authentication is required");
-837          setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
-838          authFailedCall.sendResponseIfReady();
-839          throw ae;
-840        }
-841      }
-842      if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
-843        doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
-844            SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
-845        authMethod = AuthMethod.SIMPLE;
-846        // client has already sent the initial Sasl message and we
-847        // should ignore it. Both client and server should fall back
-848        // to simple auth from now on.
-849        skipInitialSaslHandshake = true;
-850      }
-851      if (authMethod != AuthMethod.SIMPLE) {
-852        useSasl = true;
-853      }
-854
-855      dataLengthBuffer.clear();
-856      connectionPreambleRead = true;
-857      return count;
-858    }
-859
-860    private int read4Bytes() throws IOException {
-861      if (this.dataLengthBuffer.remaining() > 0) {
-862        return channelRead(channel, this.dataLengthBuffer);
-863      } else {
-864        return 0;
-865      }
-866    }
-867
-868    /**
-869     * Read off the wire. If there is not enough data to read, update the connection state with
-870     *  what we have and returns.
-871     * @return Returns -1 if failure (and caller will close connection), else zero or more.
-872     * @throws IOException
-873     * @throws InterruptedException
-874     */
-875    public int readAndProcess() throws IOException, InterruptedException {
-876      // Try and read in an int.  If new connection, the int will hold the 'HBas' HEADER.  If it
-877      // does, read in the rest of the connection preamble, the version and the auth method.
-878      // Else it will be length of the data to read (or -1 if a ping).  We catch the integer
-879      // length into the 4-byte this.dataLengthBuffer.
-880      int count = read4Bytes();
-881      if (count < 0 || dataLengthBuffer.remaining() > 0) {
-882        return count;
-883      }
-884
-885      // If we have not read the connection setup preamble, look to see if that is on the wire.
-886      if (!connectionPreambleRead) {
-887        count = readPreamble();
-888        if (!connectionPreambleRead) {
-889          return count;
-890        }
-891
-892        count = read4Bytes();
-893        if (count < 0 || dataLengthBuffer.remaining() > 0) {
-894          return count;
-895        }
-896      }
-897
-898      // We have read a length and we have read the preamble.  It is either the connection header
-899      // or it is a request.
-900      if (data == null) {
-901        dataLengthBuffer.flip();
-902        int dataLength = dataLengthBuffer.getInt();
-903        if (dataLength == RpcClient.PING_CALL_ID) {
-904          if (!useWrap) { //cover