incubator-accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1292512 - in /incubator/accumulo/trunk: ./ src/core/ src/core/src/main/java/org/apache/accumulo/core/client/impl/ src/server/ src/server/src/main/java/org/apache/accumulo/server/master/tableOps/
Date Wed, 22 Feb 2012 21:18:52 GMT
Author: kturner
Date: Wed Feb 22 21:18:51 2012
New Revision: 1292512

URL: http://svn.apache.org/viewvc?rev=1292512&view=rev
Log:
ACCUMULO-422 Fixed bug caused by tserver dying during bulk import (merged from 1.4)

Modified:
    incubator/accumulo/trunk/   (props changed)
    incubator/accumulo/trunk/src/core/   (props changed)
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
    incubator/accumulo/trunk/src/server/   (props changed)
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java

Propchange: incubator/accumulo/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 22 21:18:51 2012
@@ -1,3 +1,3 @@
 /incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873,1245632
 /incubator/accumulo/branches/1.3.5rc:1209938
-/incubator/accumulo/branches/1.4:1201902-1292413
+/incubator/accumulo/branches/1.4:1201902-1292511

Propchange: incubator/accumulo/trunk/src/core/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 22 21:18:51 2012
@@ -1,3 +1,3 @@
-/incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215
 /incubator/accumulo/branches/1.3.5rc/src/core:1209938
-/incubator/accumulo/branches/1.4/src/core:1201902-1292413
+/incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215
+/incubator/accumulo/branches/1.4/src/core:1201902-1292511

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java?rev=1292512&r1=1292511&r2=1292512&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
(original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
Wed Feb 22 21:18:51 2012
@@ -25,9 +25,11 @@ import org.apache.accumulo.core.client.A
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.impl.thrift.ClientService;
+import org.apache.accumulo.core.client.impl.thrift.ClientService.Iface;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.core.util.ThriftUtil;
@@ -79,11 +81,14 @@ public class ServerClient {
   public static <T> T executeRaw(Instance instance, ClientExecReturn<T,ClientService.Iface>
exec) throws Exception {
     while (true) {
       ClientService.Iface client = null;
+      String server = null;
       try {
-        client = ServerClient.getConnection(instance);
+        Pair<String,Iface> pair = ServerClient.getConnection(instance);
+        server = pair.getFirst();
+        client = pair.getSecond();
         return exec.execute(client);
       } catch (TTransportException tte) {
-        log.debug("ClientService request failed, retrying ... ", tte);
+        log.debug("ClientService request failed " + server + ", retrying ... ", tte);
         UtilWaitThread.sleep(100);
       } finally {
         if (client != null)
@@ -95,12 +100,15 @@ public class ServerClient {
   public static void executeRaw(Instance instance, ClientExec<ClientService.Iface>
exec) throws Exception {
     while (true) {
       ClientService.Iface client = null;
+      String server = null;
       try {
-        client = ServerClient.getConnection(instance);
+        Pair<String,Iface> pair = ServerClient.getConnection(instance);
+        server = pair.getFirst();
+        client = pair.getSecond();
         exec.execute(client);
         break;
       } catch (TTransportException tte) {
-        log.debug("ClientService request failed, retrying ... ", tte);
+        log.debug("ClientService request failed " + server + ", retrying ... ", tte);
         UtilWaitThread.sleep(100);
       } finally {
         if (client != null)
@@ -111,7 +119,7 @@ public class ServerClient {
   
   static volatile boolean warnedAboutTServersBeingDown = false;
 
-  public static ClientService.Iface getConnection(Instance instance) throws TTransportException
{
+  public static Pair<String,ClientService.Iface> getConnection(Instance instance) throws
TTransportException {
     ArgumentChecker.notNull(instance);
     // create list of servers
     ArrayList<ThriftTransportKey> servers = new ArrayList<ThriftTransportKey>();
@@ -130,11 +138,11 @@ public class ServerClient {
     
     boolean opened = false;
     try {
-      TTransport socket = ThriftTransportPool.getInstance().getAnyTransport(servers);
-      ClientService.Iface client = ThriftUtil.createClient(new ClientService.Client.Factory(),
socket);
+      Pair<String,TTransport> pair = ThriftTransportPool.getInstance().getAnyTransport(servers);
+      ClientService.Iface client = ThriftUtil.createClient(new ClientService.Client.Factory(),
pair.getSecond());
       opened = true;
       warnedAboutTServersBeingDown = false;
-      return client;
+      return new Pair<String,ClientService.Iface>(pair.getFirst(), client);
     } finally {
       if (!opened) {
         if (!warnedAboutTServersBeingDown) {

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java?rev=1292512&r1=1292511&r2=1292512&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
(original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
Wed Feb 22 21:18:51 2012
@@ -35,6 +35,7 @@ import org.apache.accumulo.core.conf.Acc
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TTimeoutTransport;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.log4j.Logger;
@@ -372,7 +373,7 @@ public class ThriftTransportPool {
     return getTransport(location, port, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
   }
   
-  TTransport getAnyTransport(List<ThriftTransportKey> servers) throws TTransportException
{
+  Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers) throws
TTransportException {
     
     servers = new ArrayList<ThriftTransportKey>(servers);
     
@@ -385,7 +386,7 @@ public class ThriftTransportPool {
               cachedConnection.setReserved(true);
               if (log.isTraceEnabled())
                 log.trace("Using existing connection to " + entry.getKey().getLocation()
+ ":" + entry.getKey().getPort());
-              return cachedConnection.transport;
+              return new Pair<String,TTransport>(entry.getKey().getLocation() + ":"
+ entry.getKey().getPort(), cachedConnection.transport);
             }
           }
         }
@@ -396,7 +397,7 @@ public class ThriftTransportPool {
     while (servers.size() > 0 && retryCount < 10) {
       int index = random.nextInt(servers.size());
       try {
-        return createNewTransport(servers.get(index));
+        return new Pair<String,TTransport>(servers.get(index).getLocation() + ":" +
servers.get(index).getPort(), createNewTransport(servers.get(index)));
       } catch (TTransportException tte) {
         log.debug("Failed to connect to " + servers.get(index), tte);
         servers.remove(index);

Propchange: incubator/accumulo/trunk/src/server/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 22 21:18:51 2012
@@ -1,3 +1,3 @@
-/incubator/accumulo/branches/1.3/src/server:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611
 /incubator/accumulo/branches/1.3.5rc/src/server:1209938
-/incubator/accumulo/branches/1.4/src/server:1201902-1292413
+/incubator/accumulo/branches/1.3/src/server:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611
+/incubator/accumulo/branches/1.4/src/server:1201902-1292511

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1292512&r1=1292511&r2=1292512&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
Wed Feb 22 21:18:51 2012
@@ -21,12 +21,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -42,6 +38,7 @@ import org.apache.accumulo.core.client.I
 import org.apache.accumulo.core.client.impl.ServerClient;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.thrift.ClientService;
+import org.apache.accumulo.core.client.impl.thrift.ClientService.Iface;
 import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
@@ -52,7 +49,7 @@ import org.apache.accumulo.core.master.s
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -73,7 +70,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
 
 import cloudtrace.instrument.TraceExecutorService;
 
@@ -384,7 +380,7 @@ class LoadFiles extends MasterRepo {
   }
   
   @Override
-  public Repo<Master> call(final long tid, Master master) throws Exception {
+  public Repo<Master> call(final long tid, final Master master) throws Exception {
     final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
     FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
         ServerConfiguration.getSiteConfiguration()));
@@ -408,61 +404,39 @@ class LoadFiles extends MasterRepo {
     for (FileStatus f : files)
       filesToLoad.add(f.getPath().toString());
     
-
-    final Map<String,Long> blackList = Collections.synchronizedMap(new HashMap<String,Long>());
-
     final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
     for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++)
{
       List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
       
-      // Figure out which files will be sent to which server
-      Map<String,List<String>> loadAssignments = initializeLoadAssignments(tid,
master, conf, blackList);
-      if (loadAssignments.size() == 0)
+      if (master.onlineTabletServers().size() == 0)
         log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid
+ ")");
       
-      while (loadAssignments.size() == 0) {
+      while (master.onlineTabletServers().size() == 0) {
         UtilWaitThread.sleep(500);
-        loadAssignments = initializeLoadAssignments(tid, master, conf, blackList);
-      }
-
-      int i = 0;
-      List<Entry<String,List<String>>> entries = new ArrayList<Entry<String,List<String>>>(loadAssignments.entrySet());
-      for (String file : filesToLoad) {
-        entries.get(i % entries.size()).getValue().add(file);
-        i++;
       }
       
       // Use the threadpool to assign files one-at-a-time to the server
-      for (Entry<String,List<String>> entry : entries) {
-        if (entry.getValue().isEmpty()) {
-          continue;
-        }
-        final Entry<String,List<String>> finalEntry = entry;
+      for (final String file : filesToLoad) {
         results.add(threadPool.submit(new Callable<List<String>>() {
           @Override
           public List<String> call() {
-            if (log.isDebugEnabled()) {
-              log.debug("Asking " + finalEntry.getKey() + " to load " + sampleList(finalEntry.getValue(),
10));
-            }
             List<String> failures = new ArrayList<String>();
             ClientService.Iface client = null;
+            String server = null;
             try {
-              client = ThriftUtil.getTServerClient(finalEntry.getKey(), conf);
-              for (String file : finalEntry.getValue()) {
-                List<String> attempt = Collections.singletonList(file);
-                log.debug("Asking " + finalEntry.getKey() + " to bulk import " + file);
-                List<String> fail = client.bulkImportFiles(null, SecurityConstants.getSystemCredentials(),
tid, tableId, attempt, errorDir, setTime);
-                if (fail.isEmpty()) {
-                  filesToLoad.remove(file);
-                } else {
-                  failures.addAll(fail);
-                }
+              Pair<String,Iface> pair = ServerClient.getConnection(master.getInstance());
+              client = pair.getSecond();
+              server = pair.getFirst();
+              List<String> attempt = Collections.singletonList(file);
+              log.debug("Asking " + pair.getFirst() + " to bulk import " + file);
+              List<String> fail = client.bulkImportFiles(null, SecurityConstants.getSystemCredentials(),
tid, tableId, attempt, errorDir, setTime);
+              if (fail.isEmpty()) {
+                filesToLoad.remove(file);
+              } else {
+                failures.addAll(fail);
               }
-            } catch (TTransportException tte) {
-              log.warn("blacklisting server " + finalEntry.getKey() + " tid " + tid + " "
+ tte, tte);
-              blackList.put(finalEntry.getKey(), System.currentTimeMillis());
             } catch (Exception ex) {
-              log.error("rpc failed, server " + finalEntry.getKey() + " tid " + tid + " "
+ ex, ex);
+              log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex, ex);
             } finally {
               ServerClient.close(client);
             }
@@ -499,29 +473,6 @@ class LoadFiles extends MasterRepo {
     return new CompleteBulkImport(tableId, source, bulk, errorDir);
   }
   
-  private Map<String,List<String>> initializeLoadAssignments(final long tid,
Master master, final SiteConfiguration conf, final Map<String,Long> blackList) {
-    
-    // remove servers from black list that have been there a while
-    Iterator<Entry<String,Long>> bliter = blackList.entrySet().iterator();
-    long zkTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
-    while (bliter.hasNext()) {
-      Entry<String,Long> blentry = bliter.next();
-      if (System.currentTimeMillis() - blentry.getValue() > zkTimeout * 2) {
-        log.debug("Removing server from blacklist " + blentry.getKey() + " tid " + tid);
-        bliter.remove();
-      }
-    }
-    
-    Set<TServerInstance> currentServers = new HashSet<TServerInstance>(master.onlineTabletServers());
-    Map<String,List<String>> loadAssignments = new HashMap<String,List<String>>();
-    for (TServerInstance server : currentServers) {
-      loadAssignments.put(server.hostPort(), new ArrayList<String>());
-    }
-
-    loadAssignments.keySet().removeAll(blackList.keySet());
-    return loadAssignments;
-  }
-  
   static String sampleList(Collection<?> potentiallyLongList, int max) {
     StringBuffer result = new StringBuffer();
     result.append("[");



Mime
View raw message