Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1C8B4C156 for ; Mon, 8 Dec 2014 23:23:56 +0000 (UTC) Received: (qmail 44238 invoked by uid 500); 8 Dec 2014 23:23:56 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 44194 invoked by uid 500); 8 Dec 2014 23:23:55 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 44175 invoked by uid 99); 8 Dec 2014 23:23:55 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Dec 2014 23:23:55 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9A70AA1EB9E; Mon, 8 Dec 2014 23:23:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stack@apache.org To: commits@hbase.apache.org Date: Mon, 08 Dec 2014 23:23:58 -0000 Message-Id: <8ddddb51ce744c34aafbe3bb022aa4fe@git.apache.org> In-Reply-To: <761c4ddb85cc4b35993ee259989d5834@git.apache.org> References: <761c4ddb85cc4b35993ee259989d5834@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/4] hbase git commit: HBASE-12597 Add RpcClient interface and enable changing of RpcClient implementation (Jurriaan Mous) HBASE-12597 Add RpcClient interface and enable changing of RpcClient implementation (Jurriaan Mous) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1d880e3f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1d880e3f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1d880e3f Branch: refs/heads/branch-1 Commit: 1d880e3f60989437ab10ecc285037b32bc694464 Parents: 823656b Author: stack Authored: Mon Dec 8 15:23:38 2014 -0800 Committer: stack Committed: Mon Dec 8 15:23:38 2014 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/ConnectionManager.java | 19 +- .../client/PreemptiveFastFailInterceptor.java | 2 +- .../hadoop/hbase/ipc/AbstractRpcClient.java | 177 ++ .../java/org/apache/hadoop/hbase/ipc/Call.java | 127 ++ .../hadoop/hbase/ipc/CallTimeoutException.java | 35 + .../apache/hadoop/hbase/ipc/ConnectionId.java | 78 + .../hadoop/hbase/ipc/FailedServerException.java | 37 + .../apache/hadoop/hbase/ipc/FailedServers.java | 79 + .../org/apache/hadoop/hbase/ipc/IPCUtil.java | 22 +- .../org/apache/hadoop/hbase/ipc/RpcClient.java | 1745 +----------------- .../hadoop/hbase/ipc/RpcClientFactory.java | 70 + .../apache/hadoop/hbase/ipc/RpcClientImpl.java | 1383 ++++++++++++++ .../hbase/zookeeper/MetaTableLocator.java | 4 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 2 +- .../hadoop/hbase/master/AssignmentManager.java | 2 +- .../hbase/regionserver/HRegionServer.java | 12 +- .../client/TestClientScannerRPCTimeout.java | 4 +- .../hadoop/hbase/client/TestClientTimeouts.java | 52 +- .../hbase/client/TestFromClientSideNoCodec.java | 6 +- .../org/apache/hadoop/hbase/client/TestHCM.java | 2 +- .../hbase/filter/FilterTestingCluster.java | 4 +- .../apache/hadoop/hbase/ipc/TestDelayedRpc.java | 15 +- .../hadoop/hbase/ipc/TestHBaseClient.java | 2 +- .../org/apache/hadoop/hbase/ipc/TestIPC.java | 20 +- .../hadoop/hbase/ipc/TestProtoBufRpc.java | 4 +- .../hbase/master/TestHMasterRPCException.java | 5 +- .../hadoop/hbase/security/TestSecureRPC.java | 8 +- .../security/token/TestTokenAuthentication.java | 5 +- .../snapshot/TestFlushSnapshotFromClient.java | 3 +- 29 files changed, 2115 insertions(+), 1809 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 1026d86..df96274 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -615,7 +616,7 @@ class ConnectionManager { this.registry = setupRegistry(); retrieveClusterId(); - this.rpcClient = new RpcClient(this.conf, this.clusterId); + this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); // Do we publish the status? @@ -635,7 +636,7 @@ class ConnectionManager { @Override public void newDead(ServerName sn) { clearCaches(sn); - rpcClient.cancelConnections(sn.getHostname(), sn.getPort()); + rpcClient.cancelConnections(sn); } }, conf, listenerClass); } @@ -781,18 +782,6 @@ class ConnectionManager { /** * For tests only. - * @param rpcClient Client we should use instead. - * @return Previous rpcClient - */ - @VisibleForTesting - RpcClient setRpcClient(final RpcClient rpcClient) { - RpcClient oldRpcClient = this.rpcClient; - this.rpcClient = rpcClient; - return oldRpcClient; - } - - /** - * For tests only. */ @VisibleForTesting RpcClient getRpcClient() { @@ -2318,7 +2307,7 @@ class ConnectionManager { clusterStatusListener.close(); } if (rpcClient != null) { - rpcClient.stop(); + rpcClient.close(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java index 4256120..6fb2de3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; -import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException; +import org.apache.hadoop.hbase.ipc.FailedServerException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.ipc.RemoteException; http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java new file mode 100644 index 0000000..766ad8f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.codec.KeyValueCodec; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.util.PoolMap; +import org.apache.hadoop.io.compress.CompressionCodec; + +import java.net.SocketAddress; + +/** + * Provides the basics for a RpcClient implementation like configuration and Logging. + */ +@InterfaceAudience.Private +public abstract class AbstractRpcClient implements RpcClient { + public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class); + + protected final Configuration conf; + protected String clusterId; + protected final SocketAddress localAddr; + + protected UserProvider userProvider; + protected final IPCUtil ipcUtil; + + protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this + // time (in ms), it will be closed at any moment. + protected final int maxRetries; //the max. no. of retries for socket connections + protected final long failureSleep; // Time to sleep before retry on failure. + protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm + protected final boolean tcpKeepAlive; // if T then use keepalives + protected final Codec codec; + protected final CompressionCodec compressor; + protected final boolean fallbackAllowed; + + protected final int connectTO; + protected final int readTO; + protected final int writeTO; + + /** + * Construct an IPC client for the cluster clusterId + * + * @param conf configuration + * @param clusterId the cluster id + * @param localAddr client socket bind address. + */ + public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) { + this.userProvider = UserProvider.instantiate(conf); + this.localAddr = localAddr; + this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); + this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT; + this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, + HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); + this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true); + this.ipcUtil = new IPCUtil(conf); + + this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes + this.conf = conf; + this.codec = getCodec(); + this.compressor = getCompressor(conf); + this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); + this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT); + this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ); + this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE); + + // login the server principal (if using secure Hadoop) + if (LOG.isDebugEnabled()) { + LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + + ", tcpKeepAlive=" + this.tcpKeepAlive + + ", tcpNoDelay=" + this.tcpNoDelay + + ", connectTO=" + this.connectTO + + ", readTO=" + this.readTO + + ", writeTO=" + this.writeTO + + ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose + + ", maxRetries=" + this.maxRetries + + ", fallbackAllowed=" + this.fallbackAllowed + + ", bind address=" + (this.localAddr != null ? this.localAddr : "null")); + } + } + + @VisibleForTesting + public static String getDefaultCodec(final Configuration c) { + // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because + // Configuration will complain -- then no default codec (and we'll pb everything). Else + // default is KeyValueCodec + return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName()); + } + + /** + * Encapsulate the ugly casting and RuntimeException conversion in private method. + * @return Codec to use on this client. + */ + Codec getCodec() { + // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND + // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding. + String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf)); + if (className == null || className.length() == 0) return null; + try { + return (Codec)Class.forName(className).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Failed getting codec " + className, e); + } + } + + /** + * Encapsulate the ugly casting and RuntimeException conversion in private method. + * @param conf configuration + * @return The compressor to use on this client. + */ + private static CompressionCodec getCompressor(final Configuration conf) { + String className = conf.get("hbase.client.rpc.compressor", null); + if (className == null || className.isEmpty()) return null; + try { + return (CompressionCodec)Class.forName(className).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Failed getting compressor " + className, e); + } + } + + /** + * Return the pool type specified in the configuration, which must be set to + * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or + * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, + * otherwise default to the former. + * + * For applications with many user threads, use a small round-robin pool. For + * applications with few user threads, you may want to try using a + * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient} + * instances should not exceed the operating system's hard limit on the number of + * connections. + * + * @param config configuration + * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or + * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal} + */ + protected static PoolMap.PoolType getPoolType(Configuration config) { + return PoolMap.PoolType + .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin, + PoolMap.PoolType.ThreadLocal); + } + + /** + * Return the pool size specified in the configuration, which is applicable only if + * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}. + * + * @param config configuration + * @return the maximum pool size + */ + protected static int getPoolSize(Configuration config) { + return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java new file mode 100644 index 0000000..df32730 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.io.IOException; + +/** A call waiting for a value. */ +@InterfaceAudience.Private +public class Call { + final int id; // call id + final Message param; // rpc request method param object + /** + * Optionally has cells when making call. Optionally has cells set on response. Used + * passing cells to the rpc and receiving the response. + */ + CellScanner cells; + Message response; // value, null if error + // The return type. Used to create shell into which we deserialize the response if any. + Message responseDefaultType; + IOException error; // exception, null if value + volatile boolean done; // true when call is done + long startTime; + final Descriptors.MethodDescriptor md; + final int timeout; // timeout in millisecond for this call; 0 means infinite. + + protected Call(int id, final Descriptors.MethodDescriptor md, Message param, + final CellScanner cells, final Message responseDefaultType, int timeout) { + this.param = param; + this.md = md; + this.cells = cells; + this.startTime = EnvironmentEdgeManager.currentTime(); + this.responseDefaultType = responseDefaultType; + this.id = id; + this.timeout = timeout; + } + + /** + * Check if the call did timeout. Set an exception (includes a notify) if it's the case. + * @return true if the call is on timeout, false otherwise. + */ + public boolean checkAndSetTimeout() { + if (timeout == 0){ + return false; + } + + long waitTime = EnvironmentEdgeManager.currentTime() - getStartTime(); + if (waitTime >= timeout) { + IOException ie = new CallTimeoutException("Call id=" + id + + ", waitTime=" + waitTime + ", operationTimeout=" + timeout + " expired."); + setException(ie); // includes a notify + return true; + } else { + return false; + } + } + + public int remainingTime() { + if (timeout == 0) { + return Integer.MAX_VALUE; + } + + int remaining = timeout - (int) (EnvironmentEdgeManager.currentTime() - getStartTime()); + return remaining > 0 ? remaining : 0; + } + + @Override + public String toString() { + return "callId: " + this.id + " methodName: " + this.md.getName() + " param {" + + (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + "}"; + } + + /** Indicate when the call is complete and the + * value or error are available. Notifies by default. */ + protected synchronized void callComplete() { + this.done = true; + notify(); // notify caller + } + + /** Set the exception when there is an error. + * Notify the caller the call is done. + * + * @param error exception thrown by the call; either local or remote + */ + public void setException(IOException error) { + this.error = error; + callComplete(); + } + + /** + * Set the return value when there is no error. + * Notify the caller the call is done. + * + * @param response return value of the call. + * @param cells Can be null + */ + public void setResponse(Message response, final CellScanner cells) { + this.response = response; + this.cells = cells; + callComplete(); + } + + public long getStartTime() { + return this.startTime; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java new file mode 100644 index 0000000..a81e5d1 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +import java.io.IOException; + +/** + * Client-side call timeout + */ +@SuppressWarnings("serial") +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class CallTimeoutException extends IOException { + public CallTimeoutException(final String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java new file mode 100644 index 0000000..a62d415 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.security.User; + +import java.net.InetSocketAddress; + +/** + * This class holds the address and the user ticket, etc. The client connections + * to servers are uniquely identified by + */ +@InterfaceAudience.Private +public class ConnectionId { + final InetSocketAddress address; + final User ticket; + private static final int PRIME = 16777619; + final String serviceName; + + public ConnectionId(User ticket, String serviceName, InetSocketAddress address) { + this.address = address; + this.ticket = ticket; + this.serviceName = serviceName; + } + + public String getServiceName() { + return this.serviceName; + } + + public InetSocketAddress getAddress() { + return address; + } + + public User getTicket() { + return ticket; + } + + @Override + public String toString() { + return this.address.toString() + "/" + this.serviceName + "/" + this.ticket; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ConnectionId) { + ConnectionId id = (ConnectionId) obj; + return address.equals(id.address) && + ((ticket != null && ticket.equals(id.ticket)) || + (ticket == id.ticket)) && + this.serviceName == id.serviceName; + } + return false; + } + + @Override // simply use the default Object#hashcode() ? + public int hashCode() { + int hashcode = (address.hashCode() + + PRIME * (PRIME * this.serviceName.hashCode() ^ + (ticket == null ? 0 : ticket.hashCode()))); + return hashcode; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServerException.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServerException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServerException.java new file mode 100644 index 0000000..12f6451 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServerException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Indicates that we're trying to connect to a already known as dead server. We will want to + * retry: we're getting this because the region location was wrong, or because + * the server just died, in which case the retry loop will help us to wait for the + * regions to recover. + */ +@SuppressWarnings("serial") +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class FailedServerException extends HBaseIOException { + public FailedServerException(String s) { + super(s); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java new file mode 100644 index 0000000..16ec16c --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FailedServers.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; + +import java.net.InetSocketAddress; +import java.util.Iterator; +import java.util.LinkedList; + +/** + * A class to manage a list of servers that failed recently. + */ +@InterfaceAudience.Private +public class FailedServers { + private final LinkedList> failedServers = new + LinkedList>(); + private final int recheckServersTimeout; + + public FailedServers(Configuration conf) { + this.recheckServersTimeout = conf.getInt( + RpcClient.FAILED_SERVER_EXPIRY_KEY, RpcClient.FAILED_SERVER_EXPIRY_DEFAULT); + } + + /** + * Add an address to the list of the failed servers list. + */ + public synchronized void addToFailedServers(InetSocketAddress address) { + final long expiry = EnvironmentEdgeManager.currentTime() + recheckServersTimeout; + failedServers.addFirst(new Pair(expiry, address.toString())); + } + + /** + * Check if the server should be considered as bad. Clean the old entries of the list. + * + * @return true if the server is in the failed servers list + */ + public synchronized boolean isFailedServer(final InetSocketAddress address) { + if (failedServers.isEmpty()) { + return false; + } + + final String lookup = address.toString(); + final long now = EnvironmentEdgeManager.currentTime(); + + // iterate, looking for the search entry and cleaning expired entries + Iterator> it = failedServers.iterator(); + while (it.hasNext()) { + Pair cur = it.next(); + if (cur.getFirst() < now) { + it.remove(); + } else { + if (lookup.equals(cur.getSecond())) { + return true; + } + } + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/1d880e3f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 67e2524..b7e7728 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -51,7 +51,7 @@ import com.google.protobuf.Message; * Utility to help ipc'ing. */ @InterfaceAudience.Private -class IPCUtil { +public class IPCUtil { public static final Log LOG = LogFactory.getLog(IPCUtil.class); /** * How much we think the decompressor will expand the original compressed content. @@ -60,7 +60,7 @@ class IPCUtil { private final int cellBlockBuildingInitialBufferSize; private final Configuration conf; - IPCUtil(final Configuration conf) { + public IPCUtil(final Configuration conf) { super(); this.conf = conf; this.cellBlockDecompressionMultiplier = @@ -81,14 +81,14 @@ class IPCUtil { * compressor. * @param codec * @param compressor - * @Param cellScanner + * @param cellScanner * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using * passed in codec and/or compressor; the returned buffer has been * flipped and is ready for reading. Use limit to find total size. * @throws IOException */ @SuppressWarnings("resource") - ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, + public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, final CellScanner cellScanner) throws IOException { if (cellScanner == null) return null; @@ -145,7 +145,7 @@ class IPCUtil { * @return CellScanner to work against the content of cellBlock * @throws IOException */ - CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, + public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, final byte [] cellBlock) throws IOException { return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length); @@ -159,7 +159,7 @@ class IPCUtil { * @return CellScanner to work against the content of cellBlock * @throws IOException */ - CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, + public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, final byte [] cellBlock, final int offset, final int length) throws IOException { // If compressed, decompress it first before passing it on else we will leak compression @@ -200,7 +200,7 @@ class IPCUtil { * @return The passed in Message serialized with delimiter. Return null if m is null * @throws IOException */ - static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException { + public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException { if (m == null) return null; int serializedSize = m.getSerializedSize(); int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize); @@ -223,7 +223,7 @@ class IPCUtil { * @return Total number of bytes written. * @throws IOException */ - static int write(final OutputStream dos, final Message header, final Message param, + public static int write(final OutputStream dos, final Message header, final Message param, final ByteBuffer cellBlock) throws IOException { // Must calculate total size and write that first so other side can read it all in in one @@ -255,7 +255,7 @@ class IPCUtil { * @param len * @throws IOException */ - static void readChunked(final DataInput in, byte[] dest, int offset, int len) + public static void readChunked(final DataInput in, byte[] dest, int offset, int len) throws IOException { int maxRead = 8192; @@ -265,11 +265,9 @@ class IPCUtil { } /** - * @param header - * @param body * @return Size on the wire when the two messages are written with writeDelimitedTo */ - static int getTotalSizeWhenWrittenDelimited(Message ... messages) { + public static int getTotalSizeWhenWrittenDelimited(Message ... messages) { int totalSize = 0; for (Message m: messages) { if (m == null) continue;