Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6A066103F0 for ; Fri, 14 Feb 2014 01:35:35 +0000 (UTC) Received: (qmail 17977 invoked by uid 500); 14 Feb 2014 01:35:34 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 17892 invoked by uid 500); 14 Feb 2014 01:35:34 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 17885 invoked by uid 99); 14 Feb 2014 01:35:34 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Feb 2014 01:35:34 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FRT_STOCK2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Feb 2014 01:35:27 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 52A052388860; Fri, 14 Feb 2014 01:35:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1568185 - in /hbase/branches/hbase-10070: ./ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-server/src/test/java/org/apache/hadoop/hbase/client/ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ Date: Fri, 14 Feb 2014 01:35:05 -0000 To: commits@hbase.apache.org From: enis@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140214013505.52A052388860@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: enis Date: Fri Feb 14 01:35:04 2014 New Revision: 1568185 URL: http://svn.apache.org/r1568185 Log: HBASE-10490 Simplify RpcClient code (Nicolas Liochon) Modified: hbase/branches/hbase-10070/ (props changed) hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java Propchange: hbase/branches/hbase-10070/ ------------------------------------------------------------------------------ Merged /hbase/trunk:r1567919 Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1568185&r1=1568184&r2=1568185&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original) +++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Fri Feb 14 01:35:04 2014 @@ -19,37 +19,14 @@ package org.apache.hadoop.hbase.ipc; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.io.OutputStream; -import java.net.ConnectException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.security.PrivilegedExceptionAction; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -import javax.net.SocketFactory; -import javax.security.sasl.SaslException; - +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Message.Builder; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import com.google.protobuf.TextFormat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -94,14 +71,34 @@ import org.apache.hadoop.security.token. import org.cloudera.htrace.Span; import org.cloudera.htrace.Trace; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.Message.Builder; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import com.google.protobuf.TextFormat; +import javax.net.SocketFactory; +import javax.security.sasl.SaslException; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** @@ -115,16 +112,15 @@ public class RpcClient { public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcClient"); protected final PoolMap connections; - protected int counter; // counter for call ids + protected final AtomicInteger callIdCnt = new AtomicInteger(); protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs final protected Configuration conf; - final protected int maxIdleTime; // connections will be culled if it was idle for - // maxIdleTime microsecs + final protected int minIdleTimeBeforeClose; // if the connection is iddle for more than this + // time (in ms), it will be closed at any moment. final protected int maxRetries; //the max. no. of retries for socket connections final protected long failureSleep; // Time to sleep before retry on failure. protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpKeepAlive; // if T then use keepalives - protected int pingInterval; // how often sends ping to the server in msecs protected FailedServers failedServers; private final Codec codec; private final CompressionCodec compressor; @@ -137,11 +133,9 @@ public class RpcClient { private final boolean fallbackAllowed; private UserProvider userProvider; - final private static String PING_INTERVAL_NAME = "ipc.ping.interval"; final private static String SOCKET_TIMEOUT = "ipc.socket.timeout"; - final static int DEFAULT_PING_INTERVAL = 60000; // 1 min final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds - final static int PING_CALL_ID = -1; + final static int PING_CALL_ID = -1; // Used by the server, for compatibility with old clients. public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry"; public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000; @@ -213,10 +207,16 @@ public class RpcClient { } } + + /** + * Indicates that we're trying to connect to a already known as dead server. We will want to + * retry: we're getting this because the region location was wrong, or because + * the server just died, in which case the retry loop will help us to wait for the + * regions to recover. + */ @SuppressWarnings("serial") @InterfaceAudience.Public @InterfaceStability.Evolving - // Shouldn't this be a DoNotRetryException? St.Ack 10/2/2013 public static class FailedServerException extends HBaseIOException { public FailedServerException(String s) { super(s); @@ -224,29 +224,6 @@ public class RpcClient { } /** - * set the ping interval value in configuration - * - * @param conf Configuration - * @param pingInterval the ping interval - */ - // Any reason we couldn't just do tcp keepalive instead of this pingery? - // St.Ack 20130121 - public static void setPingInterval(Configuration conf, int pingInterval) { - conf.setInt(PING_INTERVAL_NAME, pingInterval); - } - - /** - * Get the ping interval from configuration; - * If not set in the configuration, return the default value. - * - * @param conf Configuration - * @return the ping interval - */ - static int getPingInterval(Configuration conf) { - return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL); - } - - /** * Set the socket timeout * @param conf Configuration * @param socketTimeout the socket timeout @@ -286,9 +263,7 @@ public class RpcClient { this.cells = cells; this.startTime = System.currentTimeMillis(); this.responseDefaultType = responseDefaultType; - synchronized (RpcClient.this) { - this.id = counter++; - } + this.id = callIdCnt.getAndIncrement(); } @Override @@ -358,7 +333,7 @@ public class RpcClient { protected ConnectionId remoteId; protected Socket socket = null; // connected socket protected DataInputStream in; - protected DataOutputStream out; + protected DataOutputStream out; // Warning: can be locked inside a class level lock. private InetSocketAddress server; // server ip:port private String serverPrincipal; // server's krb5 principal name private AuthMethod authMethod; // authentication method @@ -372,11 +347,8 @@ public class RpcClient { // currently active calls protected final ConcurrentSkipListMap calls = new ConcurrentSkipListMap(); - protected final AtomicLong lastActivity = - new AtomicLong(); // last I/O activity time - protected final AtomicBoolean shouldCloseConnection = - new AtomicBoolean(); // indicate if the connection is closed - protected IOException closeException; // close reason + + protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean(); Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor) throws IOException { @@ -470,97 +442,6 @@ public class RpcClient { return userInfoPB.build(); } - /** Update lastActivity with the current time. */ - protected void touch() { - lastActivity.set(System.currentTimeMillis()); - } - - /** - * Add a call to this connection's call queue and notify - * a listener; synchronized. If the connection is dead, the call is not added, and the - * caller is notified. - * This function can return a connection that is already marked as 'shouldCloseConnection' - * It is up to the user code to check this status. - * @param call to add - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", - justification="Notify because new call available for processing") - protected synchronized void addCall(Call call) { - // If the connection is about to close, we manage this as if the call was already added - // to the connection calls list. If not, the connection creations are serialized, as - // mentioned in HBASE-6364 - if (this.shouldCloseConnection.get()) { - if (this.closeException == null) { - call.setException(new IOException( - "Call " + call.id + " not added as the connection " + remoteId + " is closing")); - } else { - call.setException(this.closeException); - } - synchronized (call) { - call.notifyAll(); - } - } else { - calls.put(call.id, call); - synchronized (call) { - notify(); - } - } - } - - /** This class sends a ping to the remote side when timeout on - * reading. If no failure is detected, it retries until at least - * a byte is read. - */ - protected class PingInputStream extends FilterInputStream { - /* constructor */ - protected PingInputStream(InputStream in) { - super(in); - } - - /* Process timeout exception - * if the connection is not going to be closed, send a ping. - * otherwise, throw the timeout exception. - */ - private void handleTimeout(SocketTimeoutException e) throws IOException { - if (shouldCloseConnection.get() || !running.get() || remoteId.rpcTimeout > 0) { - throw e; - } - sendPing(); - } - - /** Read a byte from the stream. - * Send a ping if timeout on read. Retries if no failure is detected - * until a byte is read. - * @throws IOException for any IO problem other than socket timeout - */ - @Override - public int read() throws IOException { - do { - try { - return super.read(); - } catch (SocketTimeoutException e) { - handleTimeout(e); - } - } while (true); - } - - /** Read bytes into a buffer starting from offset off - * Send a ping if timeout on read. Retries if no failure is detected - * until a byte is read. - * - * @return the total number of bytes read; -1 if the connection is closed. - */ - @Override - public int read(byte[] buf, int off, int len) throws IOException { - do { - try { - return super.read(buf, off, len); - } catch (SocketTimeoutException e) { - handleTimeout(e); - } - } while (true); - } - } protected synchronized void setupConnection() throws IOException { short ioFailures = 0; @@ -576,10 +457,7 @@ public class RpcClient { // connection time out is 20s NetUtils.connect(this.socket, remoteId.getAddress(), getSocketTimeout(conf)); - if (remoteId.rpcTimeout > 0) { - pingInterval = remoteId.rpcTimeout; // overwrite pingInterval - } - this.socket.setSoTimeout(pingInterval); + this.socket.setSoTimeout(remoteId.rpcTimeout); return; } catch (SocketTimeoutException toe) { /* The max number of retries is 45, @@ -663,30 +541,32 @@ public class RpcClient { " time(s)."); } + /** + * @throws IOException if the connection is not open. + */ + private void checkIsOpen() throws IOException { + if (shouldCloseConnection.get()) { + throw new IOException(getName() + " is closing"); + } + } + /* wait till someone signals us to start reading RPC response or * it is idle too long, it is marked as to be closed, * or the client is marked as not running. * * Return true if it is time to read a response; false otherwise. */ - protected synchronized boolean waitForWork() { - if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { - long timeout = maxIdleTime - (System.currentTimeMillis()-lastActivity.get()); - if (timeout>0) { - try { - wait(timeout); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - } + protected synchronized boolean waitForWork() throws InterruptedException{ + while (calls.isEmpty() && !shouldCloseConnection.get() && running.get() ) { + wait(minIdleTimeBeforeClose); } if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { return true; } else if (shouldCloseConnection.get()) { return false; - } else if (calls.isEmpty()) { // idle connection closed or stopped - markClosed(null); + } else if (calls.isEmpty()) { + markClosed(new IOException("idle connection closed or stopped")); return false; } else { // get stopped but there are still pending requests markClosed((IOException)new IOException().initCause( @@ -699,22 +579,6 @@ public class RpcClient { return remoteId.getAddress(); } - /* Send a ping to the server if the time elapsed - * since last I/O activity is equal to or greater than the ping interval - */ - protected synchronized void sendPing() throws IOException { - // Can we do tcp keepalive instead of this pinging? - long curTime = System.currentTimeMillis(); - if ( curTime - lastActivity.get() >= pingInterval) { - lastActivity.set(curTime); - //noinspection SynchronizeOnNonFinalField - synchronized (this.out) { - out.writeInt(PING_CALL_ID); - out.flush(); - } - } - } - @Override public void run() { if (LOG.isDebugEnabled()) { @@ -836,8 +700,7 @@ public class RpcClient { }); } - protected synchronized void setupIOstreams() - throws IOException, InterruptedException { + protected synchronized void setupIOstreams() throws IOException { if (socket != null || shouldCloseConnection.get()) { return; } @@ -867,7 +730,7 @@ public class RpcClient { // This creates a socket with a write timeout. This timeout cannot be changed, // RpcClient allows to change the timeout dynamically, but we can only // change the read timeout today. - OutputStream outStream = NetUtils.getOutputStream(socket, pingInterval); + OutputStream outStream = NetUtils.getOutputStream(socket, remoteId.rpcTimeout); // Write out the preamble -- MAGIC, version, and auth to use. writeConnectionHeaderPreamble(outStream); if (useSasl) { @@ -879,7 +742,7 @@ public class RpcClient { ticket = ticket.getRealUser(); } } - boolean continueSasl = false; + boolean continueSasl; if (ticket == null) throw new FatalConnectionException("ticket/user is null"); try { continueSasl = ticket.doAs(new PrivilegedExceptionAction() { @@ -905,28 +768,24 @@ public class RpcClient { useSasl = false; } } - this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream))); + this.in = new DataInputStream(new BufferedInputStream(inStream)); this.out = new DataOutputStream(new BufferedOutputStream(outStream)); // Now write out the connection header writeConnectionHeader(); - // update last activity time - touch(); - // start the receiver thread after the socket connection has been set up start(); return; } } catch (Throwable t) { failedServers.addToFailedServers(remoteId.address); - IOException e = null; + IOException e; if (t instanceof IOException) { e = (IOException)t; - markClosed(e); } else { e = new IOException("Could not set up IO Streams", t); - markClosed(e); } + markClosed(e); close(); throw e; } @@ -986,28 +845,15 @@ public class RpcClient { this.in = null; disposeSasl(); - // clean up all calls - if (closeException == null) { - if (!calls.isEmpty()) { - LOG.warn(getName() + ": connection is closed for no cause and calls are not empty. " + - "#Calls: " + calls.size()); - - // clean up calls anyway - closeException = new IOException("Unexpected closed connection"); - cleanupCalls(); - } - } else { - // log the info - if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": closing ipc connection to " + server + ": " + - closeException.getMessage(), closeException); - } - - // cleanup calls - cleanupCalls(); + // log the info + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": closing ipc connection to " + server); } + + cleanupCalls(); + if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": closed"); + LOG.debug(getName() + ": ipc connection closed"); } /** @@ -1018,36 +864,56 @@ public class RpcClient { * @param priority * @see #readResponse() */ - protected void writeRequest(Call call, final int priority) { - if (shouldCloseConnection.get()) return; - try { - RequestHeader.Builder builder = RequestHeader.newBuilder(); - builder.setCallId(call.id); - if (Trace.isTracing()) { - Span s = Trace.currentSpan(); - builder.setTraceInfo(RPCTInfo.newBuilder(). + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY", + justification = "on close the reader thread must stop") + protected void writeRequest(Call call, final int priority) throws IOException { + RequestHeader.Builder builder = RequestHeader.newBuilder(); + builder.setCallId(call.id); + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + builder.setTraceInfo(RPCTInfo.newBuilder(). setParentId(s.getSpanId()).setTraceId(s.getTraceId())); - } - builder.setMethodName(call.md.getName()); - builder.setRequestParam(call.param != null); - ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells); - if (cellBlock != null) { - CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); - cellBlockBuilder.setLength(cellBlock.limit()); - builder.setCellBlockMeta(cellBlockBuilder.build()); - } - // Only pass priority if there one. Let zero be same as no priority. - if (priority != 0) builder.setPriority(priority); - //noinspection SynchronizeOnNonFinalField - RequestHeader header = builder.build(); + } + builder.setMethodName(call.md.getName()); + builder.setRequestParam(call.param != null); + ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells); + if (cellBlock != null) { + CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); + cellBlockBuilder.setLength(cellBlock.limit()); + builder.setCellBlockMeta(cellBlockBuilder.build()); + } + // Only pass priority if there one. Let zero be same as no priority. + if (priority != 0) builder.setPriority(priority); + RequestHeader header = builder.build(); + + // Now we're going to write the call. We take the lock, then check that the connection + // is still valid, and, if so we do the write to the socket. If the write fails, we don't + // know where we stand, we have to close the connection. + checkIsOpen(); + calls.put(call.id, call); // On error, the call will be removed by the timeout. + try { synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC - IPCUtil.write(this.out, header, call.param, cellBlock); + if (Thread.interrupted()) throw new InterruptedIOException(); + checkIsOpen(); + + try { + IPCUtil.write(this.out, header, call.param, cellBlock); + } catch (IOException e) { + // We set the value inside the synchronized block, this way the next in line + // won't even try to write + shouldCloseConnection.set(true); + throw e; + } } - if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header)); + } finally { + synchronized (this) { + // We added a call, and may start the connection clode. In both cases, we + // need to notify the reader. + notifyAll(); } - } catch(IOException e) { - markClosed(e); + } + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header)); } } @@ -1056,7 +922,6 @@ public class RpcClient { */ protected void readResponse() { if (shouldCloseConnection.get()) return; - touch(); int totalSize = -1; try { // See HBaseServer.Call.setResponse for where we write out the response. @@ -1070,7 +935,7 @@ public class RpcClient { LOG.debug(getName() + ": got response header " + TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes"); } - Call call = calls.get(id); + Call call = calls.remove(id); if (call == null) { // So we got a response for which we have no corresponding 'call' here on the client-side. // We probably timed out waiting, cleaned up all references, and now the server decides @@ -1110,13 +975,11 @@ public class RpcClient { // timeout, so check if it still exists before setting the value. if (call != null) call.setResponse(value, cellBlockScanner); } - if (call != null) calls.remove(id); } catch (IOException e) { if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) { // Clean up open calls but don't treat this as a fatal condition, // since we expect certain responses to not make it by the specified // {@link ConnectionId#rpcTimeout}. - closeException = e; } if (ExceptionUtil.isInterrupt(e)){ } else { @@ -1154,10 +1017,18 @@ public class RpcClient { e.getStackTrace(), doNotRetry); } - protected synchronized void markClosed(IOException e) { + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY", + justification = "on close the reader thread must stop") + protected void markClosed(IOException e) { + if (e == null) throw new NullPointerException(); + if (shouldCloseConnection.compareAndSet(false, true)) { - closeException = e; - notifyAll(); + if (LOG.isDebugEnabled()) { + LOG.debug(getName() + ": marking at should closed, reason =" + e.getMessage()); + } + synchronized (this) { + notifyAll(); + } } } @@ -1167,46 +1038,38 @@ public class RpcClient { } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", - justification="Notify because timedout") + justification="Notify because timeout") protected void cleanupCalls(long rpcTimeout) { Iterator> itor = calls.entrySet().iterator(); while (itor.hasNext()) { Call c = itor.next().getValue(); long waitTime = System.currentTimeMillis() - c.getStartTime(); if (waitTime >= rpcTimeout) { - if (this.closeException == null) { - // There may be no exception in the case that there are many calls - // being multiplexed over this connection and these are succeeding - // fine while this Call object is taking a long time to finish - // over on the server; e.g. I just asked the regionserver to bulk - // open 3k regions or its a big fat multiput into a heavily-loaded - // server (Perhaps this only happens at the extremes?) - this.closeException = new CallTimeoutException("Call id=" + c.id + - ", waitTime=" + waitTime + ", rpcTimetout=" + rpcTimeout); - } - c.setException(this.closeException); - synchronized (c) { - c.notifyAll(); - } + IOException ie = new CallTimeoutException("Call id=" + c.id + + ", waitTime=" + waitTime + ", rpcTimeout=" + rpcTimeout); + c.setException(ie); itor.remove(); } else { + // This relies on the insertion order to be the call id order. This is not + // true under 'difficult' conditions (gc, ...). break; } } - try { - if (!calls.isEmpty()) { - Call firstCall = calls.get(calls.firstKey()); - long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime(); - if (maxWaitTime < rpcTimeout) { - rpcTimeout -= maxWaitTime; - } + + if (!calls.isEmpty()) { + Call firstCall = calls.get(calls.firstKey()); + long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime(); + if (maxWaitTime < rpcTimeout) { + rpcTimeout -= maxWaitTime; } + } + + try { if (!shouldCloseConnection.get()) { - closeException = null; setSocketTimeout(socket, (int) rpcTimeout); } } catch (SocketException e) { - LOG.debug("Couldn't lower timeout, which may result in longer than expected calls"); + LOG.warn("Couldn't lower timeout, which may result in longer than expected calls"); } } } @@ -1249,13 +1112,13 @@ public class RpcClient { * @param localAddr client socket bind address */ RpcClient(Configuration conf, String clusterId, SocketFactory factory, SocketAddress localAddr) { - this.maxIdleTime = conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s + this.minIdleTimeBeforeClose = + conf.getInt("hbase.ipc.client.connection.minIdleTimeBeforeClose", 120000); // 2 minutes this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true); this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); - this.pingInterval = getPingInterval(conf); this.ipcUtil = new IPCUtil(conf); this.conf = conf; this.codec = getCodec(); @@ -1273,10 +1136,9 @@ public class RpcClient { LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive=" + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + - ", maxIdleTime=" + this.maxIdleTime + + ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed=" + this.fallbackAllowed + - ", ping interval=" + this.pingInterval + "ms" + ", bind address=" + (this.localAddr != null ? this.localAddr : "null")); } } @@ -1395,7 +1257,11 @@ public class RpcClient { while (!connections.isEmpty()) { try { Thread.sleep(100); - } catch (InterruptedException ignored) { + } catch (InterruptedException e) { + LOG.info("Interrupted while stopping the client. We still have " + connections.size() + + " connections."); + Thread.currentThread().interrupt(); + return; } } } @@ -1431,14 +1297,12 @@ public class RpcClient { Call call = new Call(md, param, cells, returnType); Connection connection = getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor); - connection.writeRequest(call, priority); // send the parameter + + connection.writeRequest(call, priority); //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (call) { while (!call.done) { - if (connection.shouldCloseConnection.get()) { - throw new IOException("Unexpected closed connection"); - } call.wait(1000); // wait for the result } @@ -1454,6 +1318,8 @@ public class RpcClient { } } + + /** * Take an IOException and the address we were trying to connect to * and return an IOException with the input exception as the cause. @@ -1486,7 +1352,7 @@ public class RpcClient { * is known as actually dead. This will not prevent current operation to be retried, and, * depending on their own behavior, they may retry on the same server. This can be a feature, * for example at startup. In any case, they're likely to get connection refused (if the - * process died) or no route to host: i.e. there next retries should be faster and with a + * process died) or no route to host: i.e. their next retries should be faster and with a * safe exception. */ public void cancelConnections(String hostname, int port, IOException ioe) { @@ -1505,11 +1371,13 @@ public class RpcClient { } } - /* Get a connection from the pool, or create a new one and add it to the - * pool. Connections to a given host/port are reused. */ + /** + * Get a connection from the pool, or create a new one and add it to the + * pool. Connections to a given host/port are reused. + */ protected Connection getConnection(User ticket, Call call, InetSocketAddress addr, int rpcTimeout, final Codec codec, final CompressionCodec compressor) - throws IOException, InterruptedException { + throws IOException { if (!running.get()) throw new StoppedRpcClientException(); Connection connection; ConnectionId remoteId = @@ -1521,7 +1389,6 @@ public class RpcClient { connections.put(remoteId, connection); } } - connection.addCall(call); //we don't invoke the method below inside "synchronized (connections)" //block above. The reason for that is if the server happens to be slow, @@ -1531,6 +1398,7 @@ public class RpcClient { // waiting here; as setupIOstreams is synchronized. If the connection fails with a // timeout, they will all fail simultaneously. This is checked in setupIOstreams. connection.setupIOstreams(); + return connection; } @@ -1646,7 +1514,7 @@ public class RpcClient { // Clear it here so we don't by mistake try and these cells processing results. pcrc.setCellScanner(null); } - Pair val = null; + Pair val; try { val = call(md, param, cells, returnType, ticket, isa, rpcTimeout, pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS); Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java?rev=1568185&r1=1568184&r2=1568185&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java (original) +++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java Fri Feb 14 01:35:04 2014 @@ -1552,7 +1552,6 @@ public class TestAdmin { TEST_UTIL.getConfiguration().setInt( "hbase.regionserver.logroll.errors.tolerated", 2); - TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000); TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000); TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000); Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java?rev=1568185&r1=1568184&r2=1568185&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java (original) +++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java Fri Feb 14 01:35:04 2014 @@ -60,7 +60,6 @@ public class TestLogRollAbort { // Tweak default timeout values down for faster recovery TEST_UTIL.getConfiguration().setInt( "hbase.regionserver.logroll.errors.tolerated", 2); - TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000); TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000); TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000); Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java?rev=1568185&r1=1568184&r2=1568185&view=diff ============================================================================== --- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (original) +++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java Fri Feb 14 01:35:04 2014 @@ -124,7 +124,6 @@ public class TestLogRolling { TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32); TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2); - TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000); TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000); TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);