accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1457189 - /accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/util/Admin.java
Date Sat, 16 Mar 2013 00:54:41 GMT
Author: kturner
Date: Sat Mar 16 00:54:41 2013
New Revision: 1457189

URL: http://svn.apache.org/r1457189
Log:
ACCUMULO-892 made stopAll flush tables

Modified:
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/util/Admin.java

Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/util/Admin.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/util/Admin.java?rev=1457189&r1=1457188&r2=1457189&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/util/Admin.java
(original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/util/Admin.java
Sat Mar 16 00:54:41 2013
@@ -19,10 +19,15 @@ package org.apache.accumulo.server.util;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.impl.ClientExec;
 import org.apache.accumulo.core.client.impl.MasterClient;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
@@ -97,6 +102,10 @@ public class Admin {
         stopTabletServer(instance, CredentialHelper.create(principal, token, instance.getInstanceID()),
stopOpts.args, opts.force);
       } else {
         everything = cl.getParsedCommand().equals("stopAll");
+
+        if (everything)
+          flushAll(instance, principal, token);
+
         stopServer(instance, CredentialHelper.create(principal, token, instance.getInstanceID()),
everything);
       }
     } catch (AccumuloException e) {
@@ -107,7 +116,60 @@ public class Admin {
       System.exit(2);
     }
   }
-  
+
+  /**
+   * flushing during shutdown is a perfomance optimization, its not required. The method
will make an attempt to initiate flushes of all tables and give up if
+   * it takes too long.
+   * 
+   */
+  private static void flushAll(final Instance instance, final String principal, final AuthenticationToken
token) throws AccumuloException,
+      AccumuloSecurityException {
+    
+    final AtomicInteger flushesStarted = new AtomicInteger(0);
+
+    Runnable flushTask = new Runnable() {
+      
+      @Override
+      public void run() {
+        try {
+          Connector conn = instance.getConnector(principal, token);
+          Set<String> tables = conn.tableOperations().tableIdMap().keySet();
+          for (String table : tables) {
+            if (table.equals(Constants.METADATA_TABLE_NAME))
+              continue;
+            try {
+              conn.tableOperations().flush(table, null, null, false);
+              flushesStarted.incrementAndGet();
+            } catch (TableNotFoundException e) {}
+          }
+        } catch (Exception e) {
+          log.warn("Failed to intiate flush " + e.getMessage());
+        }
+      }
+    };
+    
+    Thread flusher = new Thread(flushTask);
+    flusher.setDaemon(true);
+    flusher.start();
+    
+    long start = System.currentTimeMillis();
+    try {
+      flusher.join(3000);
+    } catch (InterruptedException e) {}
+    
+    while (flusher.isAlive() && System.currentTimeMillis() - start < 15000) {
+      int flushCount = flushesStarted.get();
+      try {
+        flusher.join(1000);
+      } catch (InterruptedException e) {}
+      
+      if (flushCount == flushesStarted.get()) {
+        // no progress was made while waiting for join... maybe its stuck, stop waiting on
it
+        break;
+      }
+    }
+  }
+
   private static void stopServer(Instance instance, final TCredentials credentials, final
boolean tabletServersToo) throws AccumuloException, AccumuloSecurityException {
     MasterClient.execute(HdfsZooInstance.getInstance(), new ClientExec<MasterClientService.Client>()
{
       @Override



Mime
View raw message