accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [1/3] accumulo git commit: ACCUMULO-4456 Add a Thrift exception to handle when a service is not the active instance
Date Thu, 15 Sep 2016 22:32:56 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master c48a2bac3 -> 780045ce0


http://git-wip-us.apache.org/repos/asf/accumulo/blob/780045ce/core/src/main/thrift/client.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/client.thrift b/core/src/main/thrift/client.thrift
index 38a8076..722faaf 100644
--- a/core/src/main/thrift/client.thrift
+++ b/core/src/main/thrift/client.thrift
@@ -94,6 +94,10 @@ exception ThriftTableOperationException {
   5:string description
 }
 
+exception ThriftNotActiveServiceException {
+
+}
+
 struct TDiskUsage {
   1:list<string> tables
   2:i64 usage

http://git-wip-us.apache.org/repos/asf/accumulo/blob/780045ce/core/src/main/thrift/master.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/master.thrift b/core/src/main/thrift/master.thrift
index b7d10f3..58829ec 100644
--- a/core/src/main/thrift/master.thrift
+++ b/core/src/main/thrift/master.thrift
@@ -160,47 +160,47 @@ enum FateOperation {
 
 service FateService {
   // register a fate operation by reserving an opid
-  i64 beginFateOperation(2:trace.TInfo tinfo, 1:security.TCredentials credentials) throws
(1:client.ThriftSecurityException sec)
+  i64 beginFateOperation(2:trace.TInfo tinfo, 1:security.TCredentials credentials) throws
(1:client.ThriftSecurityException sec, 2:client.ThriftNotActiveServiceException tnase)
   // initiate execution of the fate operation; set autoClean to true if not waiting for completion
-  void executeFateOperation(7:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:i64
opid, 3:FateOperation op, 4:list<binary> arguments, 5:map<string, string> options,
6:bool autoClean) throws (1:client.ThriftSecurityException sec, 2:client.ThriftTableOperationException
tope)
+  void executeFateOperation(7:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:i64
opid, 3:FateOperation op, 4:list<binary> arguments, 5:map<string, string> options,
6:bool autoClean) throws (1:client.ThriftSecurityException sec, 2:client.ThriftTableOperationException
tope, 3:client.ThriftNotActiveServiceException tnase)
   // wait for completion of the operation and get the returned exception, if any
-  string waitForFateOperation(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:i64
opid) throws (1:client.ThriftSecurityException sec, 2:client.ThriftTableOperationException
tope)
+  string waitForFateOperation(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:i64
opid) throws (1:client.ThriftSecurityException sec, 2:client.ThriftTableOperationException
tope, 3:client.ThriftNotActiveServiceException tnase)
   // clean up fate operation if autoClean was not set, after waiting
-  void finishFateOperation(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:i64
opid) throws (1:client.ThriftSecurityException sec)
+  void finishFateOperation(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:i64
opid) throws (1:client.ThriftSecurityException sec, 2:client.ThriftNotActiveServiceException
tnase)
 }
 
 service MasterClientService extends FateService {
 
   // table management methods
-  i64 initiateFlush(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:string tableName)
throws (1:client.ThriftSecurityException sec, 2:client.ThriftTableOperationException tope)
-  void waitForFlush(5:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:string tableName,
6:binary startRow, 7:binary endRow, 3:i64 flushID, 4:i64 maxLoops) throws (1:client.ThriftSecurityException
sec, 2:client.ThriftTableOperationException tope)
+  i64 initiateFlush(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:string tableName)
throws (1:client.ThriftSecurityException sec, 2:client.ThriftTableOperationException tope,
3:client.ThriftNotActiveServiceException tnase)
+  void waitForFlush(5:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:string tableName,
6:binary startRow, 7:binary endRow, 3:i64 flushID, 4:i64 maxLoops) throws (1:client.ThriftSecurityException
sec, 2:client.ThriftTableOperationException tope, 3:client.ThriftNotActiveServiceException
tnase)
 
-  void setTableProperty(5:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:string
tableName, 3:string property, 4:string value) throws (1:client.ThriftSecurityException sec,
2:client.ThriftTableOperationException tope)
-  void removeTableProperty(4:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:string
tableName, 3:string property) throws (1:client.ThriftSecurityException sec, 2:client.ThriftTableOperationException
tope)
+  void setTableProperty(5:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:string
tableName, 3:string property, 4:string value) throws (1:client.ThriftSecurityException sec,
2:client.ThriftTableOperationException tope, 3:client.ThriftNotActiveServiceException tnase)
+  void removeTableProperty(4:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:string
tableName, 3:string property) throws (1:client.ThriftSecurityException sec, 2:client.ThriftTableOperationException
tope, 3:client.ThriftNotActiveServiceException tnase)
 
-  void setNamespaceProperty(5:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:string
ns, 3:string property, 4:string value) throws (1:client.ThriftSecurityException sec, 2:client.ThriftTableOperationException
tope)
-  void removeNamespaceProperty(4:trace.TInfo tinfo, 1:security.TCredentials credentials,
2:string ns, 3:string property) throws (1:client.ThriftSecurityException sec, 2:client.ThriftTableOperationException
tope)
+  void setNamespaceProperty(5:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:string
ns, 3:string property, 4:string value) throws (1:client.ThriftSecurityException sec, 2:client.ThriftTableOperationException
tope, 3:client.ThriftNotActiveServiceException tnase)
+  void removeNamespaceProperty(4:trace.TInfo tinfo, 1:security.TCredentials credentials,
2:string ns, 3:string property) throws (1:client.ThriftSecurityException sec, 2:client.ThriftTableOperationException
tope, 3:client.ThriftNotActiveServiceException tnase)
 
   // system management methods
-  void setMasterGoalState(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:MasterGoalState
state) throws (1:client.ThriftSecurityException sec);
-  void shutdown(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:bool stopTabletServers)
throws (1:client.ThriftSecurityException sec)
-  void shutdownTabletServer(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:string
tabletServer, 4:bool force) throws (1: client.ThriftSecurityException sec)
-  void setSystemProperty(4:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:string
property, 3:string value) throws (1:client.ThriftSecurityException sec)
-  void removeSystemProperty(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:string
property) throws (1:client.ThriftSecurityException sec)
+  void setMasterGoalState(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:MasterGoalState
state) throws (1:client.ThriftSecurityException sec, 2:client.ThriftNotActiveServiceException
tnase);
+  void shutdown(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:bool stopTabletServers)
throws (1:client.ThriftSecurityException sec, 2:client.ThriftNotActiveServiceException tnase)
+  void shutdownTabletServer(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:string
tabletServer, 4:bool force) throws (1: client.ThriftSecurityException sec, 2:client.ThriftNotActiveServiceException
tnase)
+  void setSystemProperty(4:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:string
property, 3:string value) throws (1:client.ThriftSecurityException sec, 2:client.ThriftNotActiveServiceException
tnase)
+  void removeSystemProperty(3:trace.TInfo tinfo, 1:security.TCredentials credentials, 2:string
property) throws (1:client.ThriftSecurityException sec, 2:client.ThriftNotActiveServiceException
tnase)
 
   // system monitoring methods
-  MasterMonitorInfo getMasterStats(2:trace.TInfo tinfo, 1:security.TCredentials credentials)
throws (1:client.ThriftSecurityException sec)
-  void waitForBalance(1:trace.TInfo tinfo) 
+  MasterMonitorInfo getMasterStats(2:trace.TInfo tinfo, 1:security.TCredentials credentials)
throws (1:client.ThriftSecurityException sec, 2:client.ThriftNotActiveServiceException tnase)
+  void waitForBalance(1:trace.TInfo tinfo) throws (1:client.ThriftNotActiveServiceException
tnase)
 
   // tablet server reporting
   oneway void reportSplitExtent(4:trace.TInfo tinfo, 1:security.TCredentials credentials,
2:string serverName, 3:TabletSplit split)
   oneway void reportTabletStatus(5:trace.TInfo tinfo, 1:security.TCredentials credentials,
2:string serverName, 3:TabletLoadState status, 4:data.TKeyExtent tablet)
 
-  list<string> getActiveTservers(1:trace.TInfo tinfo, 2:security.TCredentials credentials)
throws (1:client.ThriftSecurityException sec)
+  list<string> getActiveTservers(1:trace.TInfo tinfo, 2:security.TCredentials credentials)
throws (1:client.ThriftSecurityException sec, 2:client.ThriftNotActiveServiceException tnase)
 
   // Delegation token request
-  security.TDelegationToken getDelegationToken(1:trace.TInfo tinfo, 2:security.TCredentials
credentials, 3:security.TDelegationTokenConfig cfg) throws (1:client.ThriftSecurityException
sec)
+  security.TDelegationToken getDelegationToken(1:trace.TInfo tinfo, 2:security.TCredentials
credentials, 3:security.TDelegationTokenConfig cfg) throws (1:client.ThriftSecurityException
sec, 2:client.ThriftNotActiveServiceException tnase)
 
   // Determine when all provided logs are replicated
-  bool drainReplicationTable(1:trace.TInfo tfino, 2:security.TCredentials credentials, 3:string
tableName, 4:set<string> logsToWatch)
+  bool drainReplicationTable(1:trace.TInfo tfino, 2:security.TCredentials credentials, 3:string
tableName, 4:set<string> logsToWatch) throws (1:client.ThriftNotActiveServiceException
tnase)
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/780045ce/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
index b8a0f64..84148ae 100644
--- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
@@ -65,6 +65,7 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.impl.Credentials;
 import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
@@ -797,22 +798,26 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
    */
   public MasterMonitorInfo getMasterMonitorInfo() throws AccumuloException, AccumuloSecurityException
{
     MasterClientService.Iface client = null;
-    MasterMonitorInfo stats = null;
-    try {
-      Instance instance = new ZooKeeperInstance(getClientConfig());
-      ClientContext context = new ClientContext(instance, new Credentials("root", new PasswordToken("unchecked")),
getClientConfig());
-      client = MasterClient.getConnectionWithRetry(context);
-      stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
-    } catch (ThriftSecurityException exception) {
-      throw new AccumuloSecurityException(exception);
-    } catch (TException exception) {
-      throw new AccumuloException(exception);
-    } finally {
-      if (client != null) {
-        MasterClient.close(client);
+    while (true) {
+      try {
+        Instance instance = new ZooKeeperInstance(getClientConfig());
+        ClientContext context = new ClientContext(instance, new Credentials("root", new PasswordToken("unchecked")),
getClientConfig());
+        client = MasterClient.getConnectionWithRetry(context);
+        return client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
+      } catch (ThriftSecurityException exception) {
+        throw new AccumuloSecurityException(exception);
+      } catch (ThriftNotActiveServiceException e) {
+        // Let it loop, fetching a new location
+        log.debug("Contacted a Master which is no longer active, retrying");
+        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+      } catch (TException exception) {
+        throw new AccumuloException(exception);
+      } finally {
+        if (client != null) {
+          MasterClient.close(client);
+        }
       }
     }
-    return stats;
   }
 
   public synchronized MiniDFSCluster getMiniDfs() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/780045ce/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java
b/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java
index 2e8aa63..9b9572e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java
@@ -17,16 +17,22 @@
 package org.apache.accumulo.server.rpc;
 
 import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Objects;
 
+import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException;
 import org.apache.accumulo.server.HighlyAvailableService;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * An {@link InvocationHandler} which checks to see if a {@link HighlyAvailableService} is
the current active instance of that service, throwing
- * {@link NotActiveServiceException} when it is not the current active instance.
+ * {@link ThriftNotActiveServiceException} when it is not the current active instance.
  */
 public class HighlyAvailableServiceInvocationHandler<I> implements InvocationHandler
{
+  private static final Logger LOG = LoggerFactory.getLogger(HighlyAvailableServiceInvocationHandler.class);
 
   private final I instance;
   private final HighlyAvailableService service;
@@ -40,9 +46,14 @@ public class HighlyAvailableServiceInvocationHandler<I> implements
InvocationHan
   public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
     // If the service is not active, throw an exception
     if (!service.isActiveService()) {
-      throw new NotActiveServiceException();
+      LOG.trace("Denying access to RPC service as this instance is not the active instance.");
+      throw new ThriftNotActiveServiceException();
+    }
+    try {
+      // Otherwise, call the real method
+      return method.invoke(instance, args);
+    } catch (InvocationTargetException ex) {
+      throw ex.getCause();
     }
-    // Otherwise, call the real method
-    return method.invoke(instance, args);
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/780045ce/server/base/src/main/java/org/apache/accumulo/server/rpc/NotActiveServiceException.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/NotActiveServiceException.java
b/server/base/src/main/java/org/apache/accumulo/server/rpc/NotActiveServiceException.java
deleted file mode 100644
index 6846b67..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/NotActiveServiceException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.rpc;
-
-/**
- * An {@link Exception} which denotes that the service which was invoked is not the active
instance for that service in the Accumulo cluster.
- */
-public class NotActiveServiceException extends Exception {
-
-  private static final long serialVersionUID = 1L;
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/780045ce/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 94f51cd..ec0b8f3 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -1140,7 +1140,7 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
     clientHandler = new MasterClientServiceHandler(this);
     // Ensure that calls before the master gets the lock fail
     Iface haProxy = HighlyAvailableServiceWrapper.service(clientHandler, this);
-    Iface rpcProxy = RpcWrapper.service(clientHandler, new Processor<Iface>(haProxy));
+    Iface rpcProxy = RpcWrapper.service(haProxy, new Processor<Iface>(clientHandler));
     final Processor<Iface> processor;
     if (ThriftServerType.SASL == getThriftServerType()) {
       Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clientHandler.getClass(),
getConfiguration());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/780045ce/shell/src/main/java/org/apache/accumulo/shell/commands/ListBulkCommand.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ListBulkCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ListBulkCommand.java
index 8f09e8a..6f2e01e 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/ListBulkCommand.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ListBulkCommand.java
@@ -16,11 +16,15 @@
  */
 package org.apache.accumulo.shell.commands;
 
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
 import org.apache.accumulo.core.trace.Tracer;
@@ -48,13 +52,19 @@ public class ListBulkCommand extends Command {
 
     MasterMonitorInfo stats;
     MasterClientService.Iface client = null;
-    try {
-      AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(shellState.getInstance()));
-      client = MasterClient.getConnectionWithRetry(context);
-      stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
-    } finally {
-      if (client != null)
-        MasterClient.close(client);
+    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(shellState.getInstance()));
+    while (true) {
+      try {
+        client = MasterClient.getConnectionWithRetry(context);
+        stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
+        break;
+      } catch (ThriftNotActiveServiceException e) {
+        // Let it loop, fetching a new location
+        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+      } finally {
+        if (client != null)
+          MasterClient.close(client);
+      }
     }
 
     final boolean paginate = !cl.hasOption(disablePaginationOpt.getOpt());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/780045ce/test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java b/test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
index f207353..50e50b6 100644
--- a/test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
@@ -16,13 +16,17 @@
  */
 package org.apache.accumulo.test;
 
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
 import static org.junit.Assert.assertEquals;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.impl.Credentials;
 import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.master.thrift.MasterClientService.Client;
@@ -83,13 +87,18 @@ public class DetectDeadTabletServersIT extends ConfigurableMacBase {
     Credentials creds = new Credentials("root", new PasswordToken(ROOT_PASSWORD));
     ClientContext context = new ClientContext(c.getInstance(), creds, getClientConfig());
     Client client = null;
-    try {
-      client = MasterClient.getConnectionWithRetry(context);
-      log.info("Fetching master stats");
-      return client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
-    } finally {
-      if (client != null) {
-        MasterClient.close(client);
+    while (true) {
+      try {
+        client = MasterClient.getConnectionWithRetry(context);
+        log.info("Fetching master stats");
+        return client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
+      } catch (ThriftNotActiveServiceException e) {
+        // Let it loop, fetching a new location
+        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+      } finally {
+        if (client != null) {
+          MasterClient.close(client);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/780045ce/test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/GetMasterStats.java b/test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
index 0d0449e..5403b5f 100644
--- a/test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
+++ b/test/src/main/java/org/apache/accumulo/test/GetMasterStats.java
@@ -16,11 +16,15 @@
  */
 package org.apache.accumulo.test;
 
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException;
 import org.apache.accumulo.core.master.thrift.BulkImportStatus;
 import org.apache.accumulo.core.master.thrift.DeadServer;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -38,13 +42,19 @@ public class GetMasterStats {
   public static void main(String[] args) throws Exception {
     MasterClientService.Iface client = null;
     MasterMonitorInfo stats = null;
-    try {
-      AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance()));
-      client = MasterClient.getConnectionWithRetry(context);
-      stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
-    } finally {
-      if (client != null)
-        MasterClient.close(client);
+    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance()));
+    while (true) {
+      try {
+        client = MasterClient.getConnectionWithRetry(context);
+        stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
+        break;
+      } catch (ThriftNotActiveServiceException e) {
+        // Let it loop, fetching a new location
+        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+      } finally {
+        if (client != null)
+          MasterClient.close(client);
+      }
     }
     out(0, "State: " + stats.state.name());
     out(0, "Goal State: " + stats.goalState.name());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/780045ce/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java
b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java
index b880085..8f8c791 100644
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java
@@ -16,9 +16,12 @@
  */
 package org.apache.accumulo.test.continuous;
 
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 import java.util.Timer;
 import java.util.TimerTask;
 
@@ -30,6 +33,7 @@ import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.impl.Credentials;
 import org.apache.accumulo.core.client.impl.MasterClient;
 import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.impl.KeyExtent;
@@ -136,37 +140,42 @@ public class ContinuousStatsCollector {
     private String getACUStats() throws Exception {
 
       MasterClientService.Iface client = null;
-      try {
-        ClientContext context = new ClientContext(opts.getInstance(), new Credentials(opts.getPrincipal(),
opts.getToken()), new ServerConfigurationFactory(
-            opts.getInstance()).getConfiguration());
-        client = MasterClient.getConnectionWithRetry(context);
-        MasterMonitorInfo stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
-
-        TableInfo all = new TableInfo();
-        Map<String,TableInfo> tableSummaries = new HashMap<>();
-
-        for (TabletServerStatus server : stats.tServerInfo) {
-          for (Entry<String,TableInfo> info : server.tableMap.entrySet()) {
-            TableInfo tableSummary = tableSummaries.get(info.getKey());
-            if (tableSummary == null) {
-              tableSummary = new TableInfo();
-              tableSummaries.put(info.getKey(), tableSummary);
+      while (true) {
+        try {
+          ClientContext context = new ClientContext(opts.getInstance(), new Credentials(opts.getPrincipal(),
opts.getToken()), new ServerConfigurationFactory(
+              opts.getInstance()).getConfiguration());
+          client = MasterClient.getConnectionWithRetry(context);
+          MasterMonitorInfo stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
+
+          TableInfo all = new TableInfo();
+          Map<String,TableInfo> tableSummaries = new HashMap<>();
+
+          for (TabletServerStatus server : stats.tServerInfo) {
+            for (Entry<String,TableInfo> info : server.tableMap.entrySet()) {
+              TableInfo tableSummary = tableSummaries.get(info.getKey());
+              if (tableSummary == null) {
+                tableSummary = new TableInfo();
+                tableSummaries.put(info.getKey(), tableSummary);
+              }
+              TableInfoUtil.add(tableSummary, info.getValue());
+              TableInfoUtil.add(all, info.getValue());
             }
-            TableInfoUtil.add(tableSummary, info.getValue());
-            TableInfoUtil.add(all, info.getValue());
           }
-        }
 
-        TableInfo ti = tableSummaries.get(tableId);
+          TableInfo ti = tableSummaries.get(tableId);
 
-        return "" + stats.tServerInfo.size() + " " + all.recs + " " + (long) all.ingestRate
+ " " + (long) all.queryRate + " " + ti.recs + " "
-            + ti.recsInMemory + " " + (long) ti.ingestRate + " " + (long) ti.queryRate +
" " + ti.tablets + " " + ti.onlineTablets;
+          return "" + stats.tServerInfo.size() + " " + all.recs + " " + (long) all.ingestRate
+ " " + (long) all.queryRate + " " + ti.recs + " "
+              + ti.recsInMemory + " " + (long) ti.ingestRate + " " + (long) ti.queryRate
+ " " + ti.tablets + " " + ti.onlineTablets;
 
-      } finally {
-        if (client != null)
-          MasterClient.close(client);
+        } catch (ThriftNotActiveServiceException e) {
+          // Let it loop, fetching a new location
+          log.debug("Contacted a Master which is no longer active, retrying");
+          sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        } finally {
+          if (client != null)
+            MasterClient.close(client);
+        }
       }
-
     }
 
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/780045ce/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
index 525d9f9..527c055 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.test.functional;
 
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -25,11 +26,13 @@ import java.util.Collection;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.impl.Credentials;
 import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -104,12 +107,19 @@ public class BalanceAfterCommsFailureIT extends ConfigurableMacBase
{
     int unassignedTablets = 1;
     for (int i = 0; unassignedTablets > 0 && i < 10; i++) {
       MasterClientService.Iface client = null;
-      try {
-        client = MasterClient.getConnectionWithRetry(context);
-        stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
-      } finally {
-        if (client != null)
-          MasterClient.close(client);
+      while (true) {
+        try {
+          client = MasterClient.getConnectionWithRetry(context);
+          stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
+          break;
+        } catch (ThriftNotActiveServiceException e) {
+          // Let it loop, fetching a new location
+          log.debug("Contacted a Master which is no longer active, retrying");
+          sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        } finally {
+          if (client != null)
+            MasterClient.close(client);
+        }
       }
       unassignedTablets = stats.getUnassignedTablets();
       if (unassignedTablets > 0) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/780045ce/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
index 2fe5602..7b7a118 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
@@ -16,10 +16,13 @@
  */
 package org.apache.accumulo.test.functional;
 
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
 import java.util.Arrays;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.cli.BatchWriterOpts;
 import org.apache.accumulo.core.cli.ScannerOpts;
@@ -35,6 +38,7 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.impl.Credentials;
 import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -151,17 +155,24 @@ public class BalanceInPresenceOfOfflineTableIT extends AccumuloClusterHarness
{
 
       MasterClientService.Iface client = null;
       MasterMonitorInfo stats = null;
-      try {
-        Instance instance = new ZooKeeperInstance(cluster.getClientConfig());
-        client = MasterClient.getConnectionWithRetry(new ClientContext(instance, creds, cluster.getClientConfig()));
-        stats = client.getMasterStats(Tracer.traceInfo(), creds.toThrift(instance));
-      } catch (ThriftSecurityException exception) {
-        throw new AccumuloSecurityException(exception);
-      } catch (TException exception) {
-        throw new AccumuloException(exception);
-      } finally {
-        if (client != null) {
-          MasterClient.close(client);
+      Instance instance = new ZooKeeperInstance(cluster.getClientConfig());
+      while (true) {
+        try {
+          client = MasterClient.getConnectionWithRetry(new ClientContext(instance, creds,
cluster.getClientConfig()));
+          stats = client.getMasterStats(Tracer.traceInfo(), creds.toThrift(instance));
+          break;
+        } catch (ThriftSecurityException exception) {
+          throw new AccumuloSecurityException(exception);
+        } catch (ThriftNotActiveServiceException e) {
+          // Let it loop, fetching a new location
+          log.debug("Contacted a Master which is no longer active, retrying");
+          sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        } catch (TException exception) {
+          throw new AccumuloException(exception);
+        } finally {
+          if (client != null) {
+            MasterClient.close(client);
+          }
         }
       }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/780045ce/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java
index 62bac85..1188915 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/DynamicThreadPoolsIT.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.impl.Credentials;
 import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -106,12 +107,18 @@ public class DynamicThreadPoolsIT extends AccumuloClusterHarness {
       int count = 0;
       MasterClientService.Iface client = null;
       MasterMonitorInfo stats = null;
-      try {
-        client = MasterClient.getConnectionWithRetry(new ClientContext(c.getInstance(), creds,
clientConf));
-        stats = client.getMasterStats(Tracer.traceInfo(), creds.toThrift(c.getInstance()));
-      } finally {
-        if (client != null)
-          MasterClient.close(client);
+      while (true) {
+        try {
+          client = MasterClient.getConnectionWithRetry(new ClientContext(c.getInstance(),
creds, clientConf));
+          stats = client.getMasterStats(Tracer.traceInfo(), creds.toThrift(c.getInstance()));
+          break;
+        } catch (ThriftNotActiveServiceException e) {
+          // Let it loop, fetching a new location
+          sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        } finally {
+          if (client != null)
+            MasterClient.close(client);
+        }
       }
       for (TabletServerStatus server : stats.tServerInfo) {
         for (TableInfo table : server.tableMap.values()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/780045ce/test/src/main/java/org/apache/accumulo/test/functional/MetadataMaxFilesIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MetadataMaxFilesIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/MetadataMaxFilesIT.java
index 6c4939f..f331bbc 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/MetadataMaxFilesIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/MetadataMaxFilesIT.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.impl.Credentials;
 import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.master.thrift.MasterClientService.Client;
@@ -96,6 +97,10 @@ public class MetadataMaxFilesIT extends ConfigurableMacBase {
         client = MasterClient.getConnectionWithRetry(context);
         log.info("Fetching stats");
         stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
+      } catch (ThriftNotActiveServiceException e) {
+        // Let it loop, fetching a new location
+        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        continue;
       } finally {
         if (client != null)
           MasterClient.close(client);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/780045ce/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
index 864ba06..cbed280 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java
@@ -30,6 +30,7 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.impl.Credentials;
 import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -82,12 +83,18 @@ public class SimpleBalancerFairnessIT extends ConfigurableMacBase {
     int unassignedTablets = 1;
     for (int i = 0; unassignedTablets > 0 && i < 10; i++) {
       MasterClientService.Iface client = null;
-      try {
-        client = MasterClient.getConnectionWithRetry(context);
-        stats = client.getMasterStats(Tracer.traceInfo(), creds.toThrift(c.getInstance()));
-      } finally {
-        if (client != null)
-          MasterClient.close(client);
+      while (true) {
+        try {
+          client = MasterClient.getConnectionWithRetry(context);
+          stats = client.getMasterStats(Tracer.traceInfo(), creds.toThrift(c.getInstance()));
+          break;
+        } catch (ThriftNotActiveServiceException e) {
+          // Let it loop, fetching a new location
+          sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        } finally {
+          if (client != null)
+            MasterClient.close(client);
+        }
       }
       unassignedTablets = stats.getUnassignedTablets();
       if (unassignedTablets > 0) {


Mime
View raw message