accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1292032 - in /incubator/accumulo/trunk: ./ src/core/ src/server/ src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
Date Tue, 21 Feb 2012 21:14:47 GMT
Author: kturner
Date: Tue Feb 21 21:14:46 2012
New Revision: 1292032

URL: http://svn.apache.org/viewvc?rev=1292032&view=rev
Log:
ACCUMULO-422 Fixed two bugs caused by tserver dying during bulk import (merged from 1.4)

Modified:
    incubator/accumulo/trunk/   (props changed)
    incubator/accumulo/trunk/src/core/   (props changed)
    incubator/accumulo/trunk/src/server/   (props changed)
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java

Propchange: incubator/accumulo/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 21 21:14:46 2012
@@ -1,3 +1,3 @@
 /incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873,1245632
 /incubator/accumulo/branches/1.3.5rc:1209938
-/incubator/accumulo/branches/1.4:1201902-1245824
+/incubator/accumulo/branches/1.4:1201902-1292029

Propchange: incubator/accumulo/trunk/src/core/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 21 21:14:46 2012
@@ -1,3 +1,3 @@
-/incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215
 /incubator/accumulo/branches/1.3.5rc/src/core:1209938
-/incubator/accumulo/branches/1.4/src/core:1201902-1245824
+/incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215
+/incubator/accumulo/branches/1.4/src/core:1201902-1292029

Propchange: incubator/accumulo/trunk/src/server/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 21 21:14:46 2012
@@ -1,3 +1,3 @@
-/incubator/accumulo/branches/1.3/src/server:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611
 /incubator/accumulo/branches/1.3.5rc/src/server:1209938
-/incubator/accumulo/branches/1.4/src/server:1201902-1245824
+/incubator/accumulo/branches/1.3/src/server:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611
+/incubator/accumulo/branches/1.4/src/server:1201902-1292029

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1292032&r1=1292031&r2=1292032&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
Tue Feb 21 21:14:46 2012
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -57,6 +58,7 @@ import org.apache.accumulo.server.Server
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fate.Repo;
+import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.security.SecurityConstants;
@@ -71,6 +73,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
 
 import cloudtrace.instrument.TraceExecutorService;
 
@@ -110,7 +113,6 @@ public class BulkImport extends MasterRe
     this.sourceDir = sourceDir;
     this.errorDir = errorDir;
     this.setTime = setTime;
-    log.debug(this.getDescription());
   }
   
   @Override
@@ -283,7 +285,8 @@ class CleanUpBulkImport extends MasterRe
     Set<TServerInstance> running = master.onlineTabletServers();
     for (TServerInstance server : running) {
       try {
-        if (!master.getConnection(server).isActive(tid))
+        TServerConnection client = master.getConnection(server);
+        if (client != null && !client.isActive(tid))
           finished.add(server);
       } catch (TException ex) {
         log.info("Ignoring error trying to check on tid " + tid + " from server " + server
+ ": " + ex);
@@ -374,6 +377,13 @@ class LoadFiles extends MasterRepo {
   }
   
   @Override
+  public long isReady(long tid, Master master) throws Exception {
+    if (master.onlineTabletServers().size() == 0)
+      return 500;
+    return 0;
+  }
+  
+  @Override
   public Repo<Master> call(final long tid, Master master) throws Exception {
     final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
     FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
@@ -383,7 +393,7 @@ class LoadFiles extends MasterRepo {
       files.add(entry);
     }
     log.debug("tid " + tid + " importing " + files.size() + " files");
-    
+
     Path writable = new Path(this.errorDir, ".iswritable");
     if (!fs.createNewFile(writable)) {
       // Maybe this is a re-try... clear the flag and try again
@@ -399,16 +409,22 @@ class LoadFiles extends MasterRepo {
       filesToLoad.add(f.getPath().toString());
     
 
+    final Map<String,Long> blackList = Collections.synchronizedMap(new HashMap<String,Long>());
+
     final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
     for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++)
{
       List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
       
       // Figure out which files will be sent to which server
-      Set<TServerInstance> currentServers = Collections.synchronizedSet(new HashSet<TServerInstance>(master.onlineTabletServers()));
-      Map<String,List<String>> loadAssignments = new HashMap<String,List<String>>();
-      for (TServerInstance server : currentServers) {
-        loadAssignments.put(server.hostPort(), new ArrayList<String>());
+      Map<String,List<String>> loadAssignments = initializeLoadAssignments(tid,
master, conf, blackList);
+      if (loadAssignments.size() == 0)
+        log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid
+ ")");
+      
+      while (loadAssignments.size() == 0) {
+        UtilWaitThread.sleep(500);
+        loadAssignments = initializeLoadAssignments(tid, master, conf, blackList);
       }
+
       int i = 0;
       List<Entry<String,List<String>>> entries = new ArrayList<Entry<String,List<String>>>(loadAssignments.entrySet());
       for (String file : filesToLoad) {
@@ -442,8 +458,11 @@ class LoadFiles extends MasterRepo {
                   failures.addAll(fail);
                 }
               }
+            } catch (TTransportException tte) {
+              log.warn("blacklisting server " + finalEntry.getKey() + " tid " + tid + " "
+ tte, tte);
+              blackList.put(finalEntry.getKey(), System.currentTimeMillis());
             } catch (Exception ex) {
-              log.error(ex, ex);
+              log.error("rpc failed, server " + finalEntry.getKey() + " tid " + tid + " "
+ ex, ex);
             } finally {
               ServerClient.close(client);
             }
@@ -455,7 +474,7 @@ class LoadFiles extends MasterRepo {
       for (Future<List<String>> f : results)
         failures.addAll(f.get());
       if (filesToLoad.size() > 0) {
-        log.debug("tid " + tid + " attempt " + (i + 1) + " " + sampleList(filesToLoad, 10)
+ " failed");
+        log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad,
10) + " failed");
         UtilWaitThread.sleep(100);
       }
     }
@@ -480,6 +499,29 @@ class LoadFiles extends MasterRepo {
     return new CompleteBulkImport(tableId, source, bulk, errorDir);
   }
   
+  private Map<String,List<String>> initializeLoadAssignments(final long tid,
Master master, final SiteConfiguration conf, final Map<String,Long> blackList) {
+    
+    // remove servers from black list that have been there a while
+    Iterator<Entry<String,Long>> bliter = blackList.entrySet().iterator();
+    long zkTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
+    while (bliter.hasNext()) {
+      Entry<String,Long> blentry = bliter.next();
+      if (System.currentTimeMillis() - blentry.getValue() > zkTimeout * 2) {
+        log.debug("Removing server from blacklist " + blentry.getKey() + " tid " + tid);
+        bliter.remove();
+      }
+    }
+    
+    Set<TServerInstance> currentServers = new HashSet<TServerInstance>(master.onlineTabletServers());
+    Map<String,List<String>> loadAssignments = new HashMap<String,List<String>>();
+    for (TServerInstance server : currentServers) {
+      loadAssignments.put(server.hostPort(), new ArrayList<String>());
+    }
+
+    loadAssignments.keySet().removeAll(blackList.keySet());
+    return loadAssignments;
+  }
+  
   static String sampleList(Collection<?> potentiallyLongList, int max) {
     StringBuffer result = new StringBuffer();
     result.append("[");



Mime
View raw message