incubator-accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1293435 - in /incubator/accumulo/branches/1.4/src: core/src/main/java/org/apache/accumulo/core/client/impl/ server/src/main/java/org/apache/accumulo/server/master/tableOps/ server/src/main/java/org/apache/accumulo/server/test/randomwalk/
Date Fri, 24 Feb 2012 21:38:18 GMT
Author: kturner
Date: Fri Feb 24 21:38:18 2012
New Revision: 1293435

URL: http://svn.apache.org/viewvc?rev=1293435&view=rev
Log:
ACCUMULO-422
 * Spread bulk imports evenly across tservers
 * Closed all cached connections when a server connection has errors
 * Fixed random walk reporting of stuck nodes

Modified:
    incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
    incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Module.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Node.java

Modified: incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java?rev=1293435&r1=1293434&r2=1293435&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
(original)
+++ incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
Fri Feb 24 21:38:18 2012
@@ -120,6 +120,10 @@ public class ServerClient {
   static volatile boolean warnedAboutTServersBeingDown = false;
 
   public static Pair<String,ClientService.Iface> getConnection(Instance instance) throws
TTransportException {
+    return getConnection(instance, true);
+  }
+  
+  public static Pair<String,ClientService.Iface> getConnection(Instance instance, boolean
preferCachedConnections) throws TTransportException {
     ArgumentChecker.notNull(instance);
     // create list of servers
     ArrayList<ThriftTransportKey> servers = new ArrayList<ThriftTransportKey>();
@@ -138,7 +142,7 @@ public class ServerClient {
     
     boolean opened = false;
     try {
-      Pair<String,TTransport> pair = ThriftTransportPool.getInstance().getAnyTransport(servers);
+      Pair<String,TTransport> pair = ThriftTransportPool.getInstance().getAnyTransport(servers,
preferCachedConnections);
       ClientService.Iface client = ThriftUtil.createClient(new ClientService.Client.Factory(),
pair.getSecond());
       opened = true;
       warnedAboutTServersBeingDown = false;

Modified: incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java?rev=1293435&r1=1293434&r2=1293435&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
(original)
+++ incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
Fri Feb 24 21:38:18 2012
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.SecurityPermission;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -373,20 +374,30 @@ public class ThriftTransportPool {
     return getTransport(location, port, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
   }
   
-  Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers) throws
TTransportException {
+  Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean
preferCachedConnection) throws TTransportException {
     
     servers = new ArrayList<ThriftTransportKey>(servers);
     
-    synchronized (this) {
-      // atomically find an available location that is in the list
-      for (Entry<ThriftTransportKey,List<CachedConnection>> entry : cache.entrySet())
{
-        if (servers.contains(entry.getKey())) {
-          for (CachedConnection cachedConnection : entry.getValue()) {
-            if (!cachedConnection.isReserved()) {
-              cachedConnection.setReserved(true);
-              if (log.isTraceEnabled())
-                log.trace("Using existing connection to " + entry.getKey().getLocation()
+ ":" + entry.getKey().getPort());
-              return new Pair<String,TTransport>(entry.getKey().getLocation() + ":"
+ entry.getKey().getPort(), cachedConnection.transport);
+    if (preferCachedConnection) {
+      HashSet<ThriftTransportKey> serversSet = new HashSet<ThriftTransportKey>(servers);
+      
+      synchronized (this) {
+        
+        // randomly pick a server from the connection cache
+        serversSet.retainAll(cache.keySet());
+        
+        if (serversSet.size() > 0) {
+          ArrayList<ThriftTransportKey> cachedServers = new ArrayList<ThriftTransportKey>(serversSet);
+          Collections.shuffle(cachedServers, random);
+          
+          for (ThriftTransportKey ttk : cachedServers) {
+            for (CachedConnection cachedConnection : cache.get(ttk)) {
+              if (!cachedConnection.isReserved()) {
+                cachedConnection.setReserved(true);
+                if (log.isTraceEnabled())
+                  log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort());
+                return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(),
cachedConnection.transport);
+              }
             }
           }
         }
@@ -396,8 +407,26 @@ public class ThriftTransportPool {
     int retryCount = 0;
     while (servers.size() > 0 && retryCount < 10) {
       int index = random.nextInt(servers.size());
+      ThriftTransportKey ttk = servers.get(index);
+      
+      if (!preferCachedConnection) {
+        synchronized (this) {
+          List<CachedConnection> cachedConnList = cache.get(ttk);
+          if (cachedConnList != null) {
+            for (CachedConnection cachedConnection : cachedConnList) {
+              if (!cachedConnection.isReserved()) {
+                cachedConnection.setReserved(true);
+                if (log.isTraceEnabled())
+                  log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort());
+                return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(),
cachedConnection.transport);
+              }
+            }
+          }
+        }
+      }
+
       try {
-        return new Pair<String,TTransport>(servers.get(index).getLocation() + ":" +
servers.get(index).getPort(), createNewTransport(servers.get(index)));
+        return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(),
createNewTransport(ttk));
       } catch (TTransportException tte) {
         log.debug("Failed to connect to " + servers.get(index), tte);
         servers.remove(index);
@@ -478,13 +507,15 @@ public class ThriftTransportPool {
     boolean existInCache = false;
     CachedTTransport ctsc = (CachedTTransport) tsc;
     
+    ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>();
+
     synchronized (this) {
       List<CachedConnection> ccl = cache.get(ctsc.getCacheKey());
       for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();)
{
         CachedConnection cachedConnection = iterator.next();
         if (cachedConnection.transport == tsc) {
           if (ctsc.sawError) {
-            tsc.close();
+            closeList.add(cachedConnection);
             iterator.remove();
             
             if (log.isTraceEnabled())
@@ -520,6 +551,26 @@ public class ThriftTransportPool {
           break;
         }
       }
+      
+      // remove all unreserved cached connection when a sever has an error, not just the
connection that was returned
+      if (ctsc.sawError) {
+        for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();)
{
+          CachedConnection cachedConnection = iterator.next();
+          if (!cachedConnection.isReserved()) {
+            closeList.add(cachedConnection);
+            iterator.remove();
+          }
+        }
+      }
+    }
+    
+    // close outside of sync block
+    for (CachedConnection cachedConnection : closeList) {
+      try {
+        cachedConnection.transport.close();
+      } catch (Exception e) {
+        log.debug("Failed to close connection w/ errors", e);
+      }
     }
     
     if (!existInCache) {

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1293435&r1=1293434&r2=1293435&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
(original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
Fri Feb 24 21:38:18 2012
@@ -424,7 +424,10 @@ class LoadFiles extends MasterRepo {
             ClientService.Iface client = null;
             String server = null;
             try {
-              Pair<String,Iface> pair = ServerClient.getConnection(master.getInstance());
+              // get a connection to a random tablet server, do not prefer cached connections
because
+              // this is running on the master and there are lots of connections to tablet
servers
+              // serving the !METADATA tablets
+              Pair<String,Iface> pair = ServerClient.getConnection(master.getInstance(),
false);
               client = pair.getSecond();
               server = pair.getFirst();
               List<String> attempt = Collections.singletonList(file);

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Module.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Module.java?rev=1293435&r1=1293434&r2=1293435&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Module.java
(original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Module.java
Fri Feb 24 21:38:18 2012
@@ -162,7 +162,7 @@ public class Module extends Node {
     this.xmlFile = xmlFile;
     loadFromXml();
   }
-  
+
   @Override
   public void visit(State state, Properties props) throws Exception {
     int maxHops, maxSec;
@@ -286,24 +286,19 @@ public class Module extends Node {
   private void startTimer(final Node initNode) {
     runningLong.set(false);
     timer = new Thread(new Runnable() {
-
       @Override
       public void run() {
-        while (!runningLong.get()) {
-          try {
-            systemTime = System.currentTimeMillis();
-            synchronized (timer) {
-              timer.wait(time);
-            }
-          } catch (InterruptedException ie) {
-            return;
-          }
+        try {
+          systemTime = System.currentTimeMillis();
+          Thread.sleep(time);
+        } catch (InterruptedException ie) {
+          return;
         }
         long timeSinceLastProgress = System.currentTimeMillis() - initNode.lastProgress();
         if (timeSinceLastProgress > time) {
           log.warn("Node " + initNode + " has been running for " + timeSinceLastProgress
/ 1000.0 + " seconds. You may want to look into it.");
+          runningLong.set(true);
         }
-        runningLong.set(true);
       }
     });
     initNode.makingProgress();

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Node.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Node.java?rev=1293435&r1=1293434&r2=1293435&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Node.java
(original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Node.java
Fri Feb 24 21:38:18 2012
@@ -43,6 +43,11 @@ public abstract class Node {
   }
   
   @Override
+  public String toString() {
+    return this.getClass().getName();
+  }
+  
+  @Override
   public int hashCode() {
     return toString().hashCode();
   }



Mime
View raw message