accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1292024 - in /incubator/accumulo/branches/1.4: docs/config.html src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
Date Tue, 21 Feb 2012 21:09:53 GMT
Author: kturner
Date: Tue Feb 21 21:09:53 2012
New Revision: 1292024

URL: http://svn.apache.org/viewvc?rev=1292024&view=rev
Log:
ACCUMULO-422 Fixed two bugs caused by tserver dying during bulk import

Modified:
    incubator/accumulo/branches/1.4/docs/config.html
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java

Modified: incubator/accumulo/branches/1.4/docs/config.html
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/docs/config.html?rev=1292024&r1=1292023&r2=1292024&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/docs/config.html (original)
+++ incubator/accumulo/branches/1.4/docs/config.html Tue Feb 21 21:09:53 2012
@@ -172,90 +172,83 @@ $HADOOP_HOME/lib/[^.].*.jar,
     <td>The number of attempts to bulk-load a file before giving up.</td>
    </tr>
    <tr >
-    <td>master.bulk.server.max</td>
-    <td><b><a href='#COUNT'>count</a></b></td>
-    <td>yes</td>
-    <td><pre>4</pre></td>
-    <td>The number of servers to use during a bulk load</td>
-   </tr>
-   <tr class='highlight'>
     <td>master.bulk.threadpool.size</td>
     <td><b><a href='#COUNT'>count</a></b></td>
     <td>yes</td>
     <td><pre>5</pre></td>
     <td>The number of threads to use when coordinating a bulk-import.</td>
    </tr>
-   <tr >
+   <tr class='highlight'>
     <td>master.logger.balancer</td>
     <td><b><a href='#CLASSNAME'>java&nbsp;class</a></b></td>
     <td>yes</td>
     <td><pre>org.apache.accumulo.server.master.balancer.SimpleLoggerBalancer</pre></td>
     <td>The balancer class that accumulo will use to make logger assignment decisions.</td>
    </tr>
-   <tr class='highlight'>
+   <tr >
     <td>master.port.client</td>
     <td><b><a href='#PORT'>port</a></b></td>
     <td>yes but requires restart</td>
     <td><pre>9999</pre></td>
     <td>The port used for handling client connections on the master</td>
    </tr>
-   <tr >
+   <tr class='highlight'>
     <td>master.recovery.max.age</td>
     <td><b><a href='#TIMEDURATION'>duration</a></b></td>
     <td>yes</td>
     <td><pre>60m</pre></td>
     <td>Recovery files older than this age will be removed.</td>
    </tr>
-   <tr class='highlight'>
+   <tr >
     <td>master.recovery.pool</td>
     <td><b><a href='#STRING'>string</a></b></td>
     <td>yes</td>
     <td><pre>recovery</pre></td>
     <td>Priority queue to use for log recovery map/reduce jobs.</td>
    </tr>
-   <tr >
+   <tr class='highlight'>
     <td>master.recovery.queue</td>
     <td><b><a href='#STRING'>string</a></b></td>
     <td>yes</td>
     <td><pre>default</pre></td>
     <td>Priority queue to use for log recovery map/reduce jobs.</td>
    </tr>
-   <tr class='highlight'>
+   <tr >
     <td>master.recovery.reducers</td>
     <td><b><a href='#COUNT'>count</a></b></td>
     <td>yes</td>
     <td><pre>10</pre></td>
     <td>Number of reducers to use to sort recovery logs (per log)</td>
    </tr>
-   <tr >
+   <tr class='highlight'>
     <td>master.recovery.sort.mapreduce</td>
     <td><b><a href='#BOOLEAN'>boolean</a></b></td>
     <td>yes</td>
     <td><pre>false</pre></td>
     <td>If true, use map/reduce to sort write-ahead logs during recovery</td>
    </tr>
-   <tr class='highlight'>
+   <tr >
     <td>master.recovery.time.max</td>
     <td><b><a href='#TIMEDURATION'>duration</a></b></td>
     <td>yes</td>
     <td><pre>30m</pre></td>
     <td>The maximum time to attempt recovery before giving up</td>
    </tr>
-   <tr >
+   <tr class='highlight'>
     <td>master.server.threadcheck.time</td>
     <td><b><a href='#TIMEDURATION'>duration</a></b></td>
     <td>yes</td>
     <td><pre>1s</pre></td>
     <td>The time between adjustments of the server thread pool.</td>
    </tr>
-   <tr class='highlight'>
+   <tr >
     <td>master.server.threads.minimum</td>
     <td><b><a href='#COUNT'>count</a></b></td>
     <td>yes</td>
     <td><pre>2</pre></td>
     <td>The minimum number of threads to use to handle incoming requests.</td>
    </tr>
-   <tr >
+   <tr class='highlight'>
     <td>master.tablet.balancer</td>
     <td><b><a href='#CLASSNAME'>java&nbsp;class</a></b></td>
     <td>yes</td>

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1292024&r1=1292023&r2=1292024&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
(original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
Tue Feb 21 21:09:53 2012
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -57,6 +58,7 @@ import org.apache.accumulo.server.Server
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fate.Repo;
+import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.security.SecurityConstants;
@@ -71,6 +73,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
 
 import cloudtrace.instrument.TraceExecutorService;
 
@@ -110,7 +113,6 @@ public class BulkImport extends MasterRe
     this.sourceDir = sourceDir;
     this.errorDir = errorDir;
     this.setTime = setTime;
-    log.debug(this.getDescription());
   }
   
   @Override
@@ -283,7 +285,8 @@ class CleanUpBulkImport extends MasterRe
     Set<TServerInstance> running = master.onlineTabletServers();
     for (TServerInstance server : running) {
       try {
-        if (!master.getConnection(server).isActive(tid))
+        TServerConnection client = master.getConnection(server);
+        if (client != null && !client.isActive(tid))
           finished.add(server);
       } catch (TException ex) {
         log.info("Ignoring error trying to check on tid " + tid + " from server " + server
+ ": " + ex);
@@ -374,6 +377,13 @@ class LoadFiles extends MasterRepo {
   }
   
   @Override
+  public long isReady(long tid, Master master) throws Exception {
+    if (master.onlineTabletServers().size() == 0)
+      return 500;
+    return 0;
+  }
+  
+  @Override
   public Repo<Master> call(final long tid, Master master) throws Exception {
     final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
     FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
@@ -383,7 +393,7 @@ class LoadFiles extends MasterRepo {
       files.add(entry);
     }
     log.debug("tid " + tid + " importing " + files.size() + " files");
-    
+
     Path writable = new Path(this.errorDir, ".iswritable");
     if (!fs.createNewFile(writable)) {
       // Maybe this is a re-try... clear the flag and try again
@@ -399,16 +409,22 @@ class LoadFiles extends MasterRepo {
       filesToLoad.add(f.getPath().toString());
     
 
+    final Map<String,Long> blackList = Collections.synchronizedMap(new HashMap<String,Long>());
+
     final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
     for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++)
{
       List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
       
       // Figure out which files will be sent to which server
-      Set<TServerInstance> currentServers = Collections.synchronizedSet(new HashSet<TServerInstance>(master.onlineTabletServers()));
-      Map<String,List<String>> loadAssignments = new HashMap<String,List<String>>();
-      for (TServerInstance server : currentServers) {
-        loadAssignments.put(server.hostPort(), new ArrayList<String>());
+      Map<String,List<String>> loadAssignments = initializeLoadAssignments(tid,
master, conf, blackList);
+      if (loadAssignments.size() == 0)
+        log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid
+ ")");
+      
+      while (loadAssignments.size() == 0) {
+        UtilWaitThread.sleep(500);
+        loadAssignments = initializeLoadAssignments(tid, master, conf, blackList);
       }
+
       int i = 0;
       List<Entry<String,List<String>>> entries = new ArrayList<Entry<String,List<String>>>(loadAssignments.entrySet());
       for (String file : filesToLoad) {
@@ -442,8 +458,11 @@ class LoadFiles extends MasterRepo {
                   failures.addAll(fail);
                 }
               }
+            } catch (TTransportException tte) {
+              log.warn("blacklisting server " + finalEntry.getKey() + " tid " + tid + " "
+ tte, tte);
+              blackList.put(finalEntry.getKey(), System.currentTimeMillis());
             } catch (Exception ex) {
-              log.error(ex, ex);
+              log.error("rpc failed, server " + finalEntry.getKey() + " tid " + tid + " "
+ ex, ex);
             } finally {
               ServerClient.close(client);
             }
@@ -455,7 +474,7 @@ class LoadFiles extends MasterRepo {
       for (Future<List<String>> f : results)
         failures.addAll(f.get());
       if (filesToLoad.size() > 0) {
-        log.debug("tid " + tid + " attempt " + (i + 1) + " " + sampleList(filesToLoad, 10)
+ " failed");
+        log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad,
10) + " failed");
         UtilWaitThread.sleep(100);
       }
     }
@@ -480,6 +499,29 @@ class LoadFiles extends MasterRepo {
     return new CompleteBulkImport(tableId, source, bulk, errorDir);
   }
   
+  private Map<String,List<String>> initializeLoadAssignments(final long tid,
Master master, final SiteConfiguration conf, final Map<String,Long> blackList) {
+    
+    // remove servers from black list that have been there a while
+    Iterator<Entry<String,Long>> bliter = blackList.entrySet().iterator();
+    long zkTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
+    while (bliter.hasNext()) {
+      Entry<String,Long> blentry = bliter.next();
+      if (System.currentTimeMillis() - blentry.getValue() > zkTimeout * 2) {
+        log.debug("Removing server from blacklist " + blentry.getKey() + " tid " + tid);
+        bliter.remove();
+      }
+    }
+    
+    Set<TServerInstance> currentServers = new HashSet<TServerInstance>(master.onlineTabletServers());
+    Map<String,List<String>> loadAssignments = new HashMap<String,List<String>>();
+    for (TServerInstance server : currentServers) {
+      loadAssignments.put(server.hostPort(), new ArrayList<String>());
+    }
+
+    loadAssignments.keySet().removeAll(blackList.keySet());
+    return loadAssignments;
+  }
+  
   static String sampleList(Collection<?> potentiallyLongList, int max) {
     StringBuffer result = new StringBuffer();
     result.append("[");



Mime
View raw message