hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nkey...@apache.org
Subject svn commit: r1375473 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-server/src/main/java/org/apache/hadoop/hbase/util/ hbase-server/src/test/java/org/apache/hadoop/hb...
Date Tue, 21 Aug 2012 11:12:11 GMT
Author: nkeywal
Date: Tue Aug 21 11:12:10 2012
New Revision: 1375473

URL: http://svn.apache.org/viewvc?rev=1375473&view=rev
Log:
HBASE-6364 Powering down the server host holding the .META. table causes HBase Client to take
excessively long to recover and connect to reassigned .META. table

Added:
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java
Modified:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdgeManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1375473&r1=1375472&r2=1375473&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue Aug
21 11:12:10 2012
@@ -187,6 +187,9 @@ public final class HConstants {
   /** Parameter name for what master implementation to use. */
   public static final String MASTER_IMPL= "hbase.master.impl";
 
+  /** Parameter name for what hbase client implementation to use. */
+  public static final String HBASECLIENT_IMPL= "hbase.hbaseclient.impl";
+
   /** Parameter name for how often threads should wake up */
   public static final String THREAD_WAKE_FREQUENCY = "hbase.server.thread.wakefrequency";
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java?rev=1375473&r1=1375472&r2=1375473&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java Tue
Aug 21 11:12:10 2012
@@ -18,12 +18,15 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
 import java.util.Map;
 
 import javax.net.SocketFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
 import org.apache.hadoop.io.Writable;
 
@@ -32,7 +35,7 @@ import org.apache.hadoop.io.Writable;
  * Enables reuse/sharing of clients on a per SocketFactory basis. A client 
  * establishes certain configuration dependent characteristics like timeouts, 
  * tcp-keepalive (true or false), etc. For more details on the characteristics,
- * look at {@link HBaseClient#HBaseClient(Class, Configuration, SocketFactory)}
+ * look at {@link HBaseClient#HBaseClient(Configuration, SocketFactory)}
  * Creation of dynamic proxies to protocols creates the clients (and increments
  * reference count once created), and stopping of the proxies leads to clearing
  * out references and when the reference drops to zero, the cache mapping is 
@@ -52,12 +55,29 @@ class ClientCache {
    * @param factory socket factory
    * @return an IPC client
    */
-  protected synchronized HBaseClient getClient(Configuration conf,
-      SocketFactory factory) {
+  @SuppressWarnings("unchecked")
+  protected synchronized HBaseClient getClient(Configuration conf, SocketFactory factory)
{
+
     HBaseClient client = clients.get(factory);
     if (client == null) {
+      Class<? extends HBaseClient> hbaseClientClass = (Class<? extends HBaseClient>)
conf
+          .getClass(HConstants.HBASECLIENT_IMPL, HBaseClient.class);
+
       // Make an hbase client instead of hadoop Client.
-      client = new HBaseClient(conf, factory);
+      try {
+        Constructor<? extends HBaseClient> cst = hbaseClientClass.getConstructor(
+            Configuration.class, SocketFactory.class);
+        client = cst.newInstance(conf, factory);
+      } catch (InvocationTargetException e) {
+        throw new RuntimeException(e);
+      } catch (InstantiationException e) {
+        throw new RuntimeException(e);
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      } catch (NoSuchMethodException e) {
+        throw new RuntimeException("No matching constructor in "+hbaseClientClass.getName(),
e);
+      }
+
       clients.put(factory, client);
     } else {
       client.incCount();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1375473&r1=1375472&r2=1375473&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Tue
Aug 21 11:12:10 2012
@@ -38,6 +38,7 @@ import java.net.UnknownHostException;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
@@ -69,6 +70,8 @@ import org.apache.hadoop.hbase.security.
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PoolMap;
 import org.apache.hadoop.hbase.util.PoolMap.PoolType;
 import org.apache.hadoop.io.IOUtils;
@@ -113,6 +116,7 @@ public class HBaseClient {
   protected final boolean tcpKeepAlive; // if T then use keepalives
   protected int pingInterval; // how often sends ping to the server in msecs
   protected int socketTimeout; // socket timeout
+  protected FailedServers failedServers;
 
   protected final SocketFactory socketFactory;           // how to create sockets
   private int refCount = 1;
@@ -124,6 +128,68 @@ public class HBaseClient {
   final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds
   final static int PING_CALL_ID = -1;
 
+  public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
+  public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
+
+  /**
+   * A class to manage a list of servers that failed recently.
+   */
+  static class FailedServers {
+    private final LinkedList<Pair<Long, String>> failedServers = new
+        LinkedList<Pair<Long, java.lang.String>>();
+    private final int recheckServersTimeout;
+
+    FailedServers(Configuration conf) {
+      this.recheckServersTimeout = conf.getInt(
+          FAILED_SERVER_EXPIRY_KEY, FAILED_SERVER_EXPIRY_DEFAULT);
+    }
+
+    /**
+     * Add an address to the list of the failed servers list.
+     */
+    public synchronized void addToFailedServers(InetSocketAddress address) {
+      final long expiry = EnvironmentEdgeManager.currentTimeMillis() + recheckServersTimeout;
+      failedServers.addFirst(new Pair<Long, String>(expiry, address.toString()));
+    }
+
+    /**
+     * Check if the server should be considered as bad. Clean the old entries of the list.
+     *
+     * @return true if the server is in the failed servers list
+     */
+    public synchronized boolean isFailedServer(final InetSocketAddress address) {
+      if (failedServers.isEmpty()) {
+        return false;
+      }
+
+      final String lookup = address.toString();
+      final long now = EnvironmentEdgeManager.currentTimeMillis();
+
+      // iterate, looking for the search entry and cleaning expired entries
+      Iterator<Pair<Long, String>> it = failedServers.iterator();
+      while (it.hasNext()) {
+        Pair<Long, String> cur = it.next();
+        if (cur.getFirst() < now) {
+          it.remove();
+        } else {
+          if (lookup.equals(cur.getSecond())) {
+            return true;
+          }
+        }
+      }
+
+      return false;
+    }
+
+  }
+
+  public static class FailedServerException extends IOException {
+    public FailedServerException(String s) {
+      super(s);
+    }
+  }
+
+
   /**
    * set the ping interval value in configuration
    *
@@ -240,6 +306,15 @@ public class HBaseClient {
     tokenHandlers.put(AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.toString(),
         new AuthenticationTokenSelector());
   }
+
+  /**
+   * Creates a connection. Can be overridden by a subclass for testing.
+   * @param remoteId - the ConnectionId to use for the connection creation.
+   */
+  protected Connection createConnection(ConnectionId remoteId) throws IOException {
+    return new Connection(remoteId);
+  }
+
   /** Thread that reads responses and notifies callers.  Each connection owns a
    * socket connected to a remote address.  Calls are multiplexed through this
    * socket: responses may be delivered out of order. */
@@ -263,7 +338,7 @@ public class HBaseClient {
     protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate
if the connection is closed
     protected IOException closeException; // close reason
 
-    public Connection(ConnectionId remoteId) throws IOException {
+    Connection(ConnectionId remoteId) throws IOException {
       if (remoteId.getAddress().isUnresolved()) {
         throw new UnknownHostException("unknown host: " +
                                        remoteId.getAddress().getHostName());
@@ -359,17 +434,30 @@ public class HBaseClient {
 
     /**
      * Add a call to this connection's call queue and notify
-     * a listener; synchronized.
-     * Returns false if called during shutdown.
+     * a listener; synchronized. If the connection is dead, the call is not added, and the
+     * caller is notified.
+     * This function can return a connection that is already marked as 'shouldCloseConnection'
+     *  It is up to the user code to check this status.
      * @param call to add
-     * @return true if the call was added.
      */
-    protected synchronized boolean addCall(Call call) {
-      if (shouldCloseConnection.get())
-        return false;
-      calls.put(call.id, call);
-      notify();
-      return true;
+    protected synchronized void addCall(Call call) {
+      // If the connection is about to close, we manage this as if the call was already added
+      //  to the connection calls list. If not, the connection creations are serialized,
as
+      //  mentioned in HBASE-6364
+      if (this.shouldCloseConnection.get()) {
+        if (this.closeException == null) {
+          call.setException(new IOException(
+              "Call " + call.id + " not added as the connection " + remoteId + " is closing"));
+        } else {
+          call.setException(this.closeException);
+        }
+        synchronized (call) {
+          call.notifyAll();
+        }
+      } else {
+        calls.put(call.id, call);
+        notify();
+      }
     }
 
     /** This class sends a ping to the remote side when timeout on
@@ -682,6 +770,18 @@ public class HBaseClient {
         return;
       }
 
+      if (failedServers.isFailedServer(remoteId.getAddress())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Not trying to connect to " + server +
+              " this server is in the failed servers list");
+        }
+        IOException e = new FailedServerException(
+            "This server is in the failed servers list: " + server);
+        markClosed(e);
+        close();
+        throw e;
+      }
+
       try {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Connecting to "+server);
@@ -698,7 +798,7 @@ public class HBaseClient {
             final InputStream in2 = inStream;
             final OutputStream out2 = outStream;
             UserGroupInformation ticket = remoteId.getTicket().getUGI();
-            if (authMethod == AuthMethod.KERBEROS) {;
+            if (authMethod == AuthMethod.KERBEROS) {
               if (ticket != null && ticket.getRealUser() != null) {
                 ticket = ticket.getRealUser();
               }
@@ -744,6 +844,7 @@ public class HBaseClient {
           return;
         }
       } catch (IOException e) {
+        failedServers.addToFailedServers(remoteId.address);
         markClosed(e);
         close();
 
@@ -1037,7 +1138,6 @@ public class HBaseClient {
   /**
    * Construct an IPC client whose values are of the {@link Message}
    * class.
-   * @param valueClass value class
    * @param conf configuration
    * @param factory socket factory
    */
@@ -1057,6 +1157,7 @@ public class HBaseClient {
     this.clusterId = conf.get(HConstants.CLUSTER_ID, "default");
     this.connections = new PoolMap<ConnectionId, Connection>(
         getPoolType(conf), getPoolSize(conf));
+    this.failedServers = new FailedServers(conf);
   }
 
   /**
@@ -1297,20 +1398,22 @@ public class HBaseClient {
      * refs for keys in HashMap properly. For now its ok.
      */
     ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout);
-    do {
-      synchronized (connections) {
-        connection = connections.get(remoteId);
-        if (connection == null) {
-          connection = new Connection(remoteId);
-          connections.put(remoteId, connection);
-        }
+    synchronized (connections) {
+      connection = connections.get(remoteId);
+      if (connection == null) {
+        connection = createConnection(remoteId);
+        connections.put(remoteId, connection);
       }
-    } while (!connection.addCall(call));
+    }
+    connection.addCall(call);
 
     //we don't invoke the method below inside "synchronized (connections)"
     //block above. The reason for that is if the server happens to be slow,
     //it will take longer to establish a connection and that will slow the
     //entire system down.
+    //Moreover, if the connection is currently created, there will be many threads
+    // waiting here; as setupIOstreams is synchronized. If the connection fails with a
+    // timeout, they will all fail simultaneously. This is checked in setupIOstreams.
     connection.setupIOstreams();
     return connection;
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdgeManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdgeManager.java?rev=1375473&r1=1375472&r2=1375473&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdgeManager.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdgeManager.java
Tue Aug 21 11:12:10 2012
@@ -50,7 +50,7 @@ public class EnvironmentEdgeManager {
    * Resets the managed instance to the default instance: {@link
    * DefaultEnvironmentEdge}.
    */
-  static void reset() {
+  public static void reset() {
     injectEdge(new DefaultEnvironmentEdge());
   }
 
@@ -60,7 +60,7 @@ public class EnvironmentEdgeManager {
    *
    * @param edge the new edge.
    */
-  static void injectEdge(EnvironmentEdge edge) {
+  public static void injectEdge(EnvironmentEdge edge) {
     if (edge == null) {
       reset();
     } else {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java?rev=1375473&r1=1375472&r2=1375473&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java
Tue Aug 21 11:12:10 2012
@@ -35,6 +35,10 @@ public class ManualEnvironmentEdge imple
     value = newValue;
   }
 
+  public void incValue(long addedValue) {
+    value += addedValue;
+  }
+
   @Override
   public long currentTimeMillis() {
     return this.value;

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java?rev=1375473&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java
(added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java
Tue Aug 21 11:12:10 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.net.InetSocketAddress;
+
+@Category(MediumTests.class)   // Can't be small, we're playing with the EnvironmentEdge
+public class TestHBaseClient {
+
+  @Test
+  public void testFailedServer(){
+    ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(  ee );
+    HBaseClient.FailedServers fs = new HBaseClient.FailedServers(new Configuration());
+
+    InetSocketAddress ia = InetSocketAddress.createUnresolved("bad", 12);
+    InetSocketAddress ia2 = InetSocketAddress.createUnresolved("bad", 12);  // same server
as ia
+    InetSocketAddress ia3 = InetSocketAddress.createUnresolved("badtoo", 12);
+    InetSocketAddress ia4 = InetSocketAddress.createUnresolved("badtoo", 13);
+
+
+    Assert.assertFalse( fs.isFailedServer(ia) );
+
+    fs.addToFailedServers(ia);
+    Assert.assertTrue( fs.isFailedServer(ia) );
+    Assert.assertTrue( fs.isFailedServer(ia2) );
+
+    ee.incValue( 1 );
+    Assert.assertTrue( fs.isFailedServer(ia) );
+    Assert.assertTrue( fs.isFailedServer(ia2) );
+
+    ee.incValue( HBaseClient.FAILED_SERVER_EXPIRY_DEFAULT + 1 );
+    Assert.assertFalse( fs.isFailedServer(ia) );
+    Assert.assertFalse( fs.isFailedServer(ia2) );
+
+    fs.addToFailedServers(ia);
+    fs.addToFailedServers(ia3);
+    fs.addToFailedServers(ia4);
+
+    Assert.assertTrue( fs.isFailedServer(ia) );
+    Assert.assertTrue( fs.isFailedServer(ia2) );
+    Assert.assertTrue( fs.isFailedServer(ia3) );
+    Assert.assertTrue( fs.isFailedServer(ia4) );
+
+    ee.incValue( HBaseClient.FAILED_SERVER_EXPIRY_DEFAULT + 1 );
+    Assert.assertFalse( fs.isFailedServer(ia) );
+    Assert.assertFalse( fs.isFailedServer(ia2) );
+    Assert.assertFalse( fs.isFailedServer(ia3) );
+    Assert.assertFalse( fs.isFailedServer(ia4) );
+
+
+    fs.addToFailedServers(ia3);
+    Assert.assertFalse( fs.isFailedServer(ia4) );
+  }
+}



Mime
View raw message