accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [1/2] git commit: ACCUMULO-1585 track tablet servers by their entry in zookeeper, not by their resolved address ACCUMULO-1601 make interface hinting consistent across all servers
Date Tue, 23 Jul 2013 15:29:11 GMT
Updated Branches:
  refs/heads/master 2ad6a8188 -> 80d1c3373


ACCUMULO-1585 track tablet servers by their entry in zookeeper, not by
their resolved address
ACCUMULO-1601 make interface hinting consistent across all servers


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/10b44e79
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/10b44e79
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/10b44e79

Branch: refs/heads/master
Commit: 10b44e79544b5f16cd747de7926af23739bf5726
Parents: 1a48f7c
Author: Eric Newton <ecn@apache.org>
Authored: Tue Jul 23 11:27:26 2013 -0400
Committer: Eric Newton <ecn@apache.org>
Committed: Tue Jul 23 11:27:26 2013 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/server/Accumulo.java    | 12 ---
 .../server/gc/SimpleGarbageCollector.java       | 71 +++++-----------
 .../accumulo/server/master/LiveTServerSet.java  | 89 +++++++++++---------
 .../apache/accumulo/server/master/Master.java   | 14 +--
 .../apache/accumulo/server/monitor/Monitor.java |  5 +-
 .../server/tabletserver/TabletServer.java       | 20 +++--
 .../accumulo/server/trace/TraceServer.java      |  5 +-
 .../accumulo/server/util/TServerUtils.java      | 35 ++++----
 .../accumulo/server/gc/TestConfirmDeletes.java  |  3 +-
 .../accumulo/test/functional/ZombieTServer.java |  8 +-
 .../test/performance/thrift/NullTserver.java    |  2 +-
 .../org/apache/accumulo/test/ShellServerIT.java |  1 +
 .../accumulo/test/functional/MacTest.java       |  3 -
 13 files changed, 121 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/Accumulo.java b/server/src/main/java/org/apache/accumulo/server/Accumulo.java
index fa3f2f3..dc90a7b 100644
--- a/server/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/src/main/java/org/apache/accumulo/server/Accumulo.java
@@ -169,18 +169,6 @@ public class Accumulo {
     }, 1000, 10 * 60 * 1000);
   }
   
-  public static String getLocalAddress(String[] args) throws UnknownHostException {
-    InetAddress result = InetAddress.getLocalHost();
-    for (int i = 0; i < args.length - 1; i++) {
-      if (args[i].equals("-a") || args[i].equals("--address")) {
-        result = InetAddress.getByName(args[i + 1]);
-        log.debug("Local address is: " + args[i + 1] + " (" + result.toString() + ")");
-        break;
-      }
-    }
-    return result.getHostName();
-  }
-  
   public static void waitForZookeeperAndHdfs(VolumeManager fs) {
     log.info("Attempting to talk to zookeeper");
     while (true) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
b/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
index de73282..4387755 100644
--- a/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
+++ b/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
@@ -37,7 +37,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.cli.Help;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
@@ -79,6 +78,7 @@ import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
 import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.ServerOpts;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
@@ -107,7 +107,7 @@ import com.beust.jcommander.Parameter;
 public class SimpleGarbageCollector implements Iface {
   private static final Text EMPTY_TEXT = new Text();
   
-  static class Opts extends Help {
+  static class Opts extends ServerOpts {
     @Parameter(names = {"-v", "--verbose"}, description = "extra information will get printed
to stdout also")
     boolean verbose = false;
     @Parameter(names = {"-s", "--safemode"}, description = "safe mode will not delete files")
@@ -115,8 +115,6 @@ public class SimpleGarbageCollector implements Iface {
     @Parameter(names = {"-o", "--offline"},
         description = "offline mode will run once and check data files directly; this is
dangerous if accumulo is running or not shut down properly")
     boolean offline = false;
-    @Parameter(names = {"-a", "--address"}, description = "specify our local address")
-    String address = null;
   }
   
   // how much of the JVM's available memory should it use gathering candidates
@@ -130,8 +128,7 @@ public class SimpleGarbageCollector implements Iface {
   private boolean checkForBulkProcessingFiles;
   private VolumeManager fs;
   private boolean useTrash = true;
-  private boolean safemode = false, offline = false, verbose = false;
-  private String address = "localhost";
+  private Opts opts = new Opts();
   private ZooLock lock;
   private Key continueKey = null;
   
@@ -143,46 +140,21 @@ public class SimpleGarbageCollector implements Iface {
   
   public static void main(String[] args) throws UnknownHostException, IOException {
     SecurityUtil.serverLogin();
-    
     Instance instance = HdfsZooInstance.getInstance();
     ServerConfiguration serverConf = new ServerConfiguration(instance);
     final VolumeManager fs = VolumeManagerImpl.get();
     Accumulo.init(fs, serverConf, "gc");
-    String address = "localhost";
-    SimpleGarbageCollector gc = new SimpleGarbageCollector();
     Opts opts = new Opts();
-    opts.parseArgs(SimpleGarbageCollector.class.getName(), args);
-    
-    if (opts.safeMode)
-      gc.setSafeMode();
-    if (opts.offline)
-      gc.setOffline();
-    if (opts.verbose)
-      gc.setVerbose();
-    if (opts.address != null)
-      gc.useAddress(address);
+    opts.parseArgs("gc", args);
+    SimpleGarbageCollector gc = new SimpleGarbageCollector(opts);
     
     gc.init(fs, instance, SystemCredentials.get().getAsThrift(), serverConf.getConfiguration().getBoolean(Property.GC_TRASH_IGNORE));
-    Accumulo.enableTracing(address, "gc");
+    Accumulo.enableTracing(opts.getAddress(), "gc");
     gc.run();
   }
   
-  public SimpleGarbageCollector() {}
-  
-  public void setSafeMode() {
-    this.safemode = true;
-  }
-  
-  public void setOffline() {
-    this.offline = true;
-  }
-  
-  public void setVerbose() {
-    this.verbose = true;
-  }
-  
-  public void useAddress(String address) {
-    this.address = address;
+  public SimpleGarbageCollector(Opts opts) {
+    this.opts = opts;
   }
   
   public void init(VolumeManager fs, Instance instance, TCredentials credentials, boolean
noTrash) throws IOException {
@@ -193,11 +165,11 @@ public class SimpleGarbageCollector implements Iface {
     gcStartDelay = instance.getConfiguration().getTimeInMillis(Property.GC_CYCLE_START);
     long gcDelay = instance.getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY);
     numDeleteThreads = instance.getConfiguration().getCount(Property.GC_DELETE_THREADS);
-    log.info("start delay: " + (offline ? 0 + " sec (offline)" : gcStartDelay + " milliseconds"));
+    log.info("start delay: " + (opts.offline ? 0 + " sec (offline)" : gcStartDelay + " milliseconds"));
     log.info("time delay: " + gcDelay + " milliseconds");
-    log.info("safemode: " + safemode);
-    log.info("offline: " + offline);
-    log.info("verbose: " + verbose);
+    log.info("safemode: " + opts.safeMode);
+    log.info("offline: " + opts.offline);
+    log.info("verbose: " + opts.verbose);
     log.info("memory threshold: " + CANDIDATE_MEMORY_PERCENTAGE + " of " + Runtime.getRuntime().maxMemory()
+ " bytes");
     log.info("delete threads: " + numDeleteThreads);
     useTrash = !noTrash;
@@ -208,7 +180,7 @@ public class SimpleGarbageCollector implements Iface {
     
     // Sleep for an initial period, giving the master time to start up and
     // old data files to be unused
-    if (!offline) {
+    if (!opts.offline) {
       try {
         getZooLock(startStatsService());
       } catch (Exception ex) {
@@ -254,8 +226,8 @@ public class SimpleGarbageCollector implements Iface {
         confirmDeletesSpan.stop();
         
         // STEP 3: delete files
-        if (safemode) {
-          if (verbose)
+        if (opts.safeMode) {
+          if (opts.verbose)
             System.out.println("SAFEMODE: There are " + candidates.size() + " data file candidates
marked for deletion.%n"
                 + "          Examine the log files to identify them.%n" + "          They
can be removed by executing: bin/accumulo gc --offline%n"
                 + "WARNING:  Do not run the garbage collector in offline mode unless you
are positive%n"
@@ -289,7 +261,7 @@ public class SimpleGarbageCollector implements Iface {
       tStop = System.currentTimeMillis();
       log.info(String.format("Collect cycle took %.2f seconds", ((tStop - tStart) / 1000.0)));
       
-      if (offline)
+      if (opts.offline)
         break;
       
       if (candidateMemExceeded) {
@@ -421,13 +393,14 @@ public class SimpleGarbageCollector implements Iface {
     Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(this));
     int port = instance.getConfiguration().getPort(Property.GC_PORT);
     long maxMessageSize = instance.getConfiguration().getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
+    InetSocketAddress result = new InetSocketAddress(opts.getAddress(), port);
     try {
-      TServerUtils.startTServer(port, processor, this.getClass().getSimpleName(), "GC Monitor
Service", 2, 1000, maxMessageSize);
+      TServerUtils.startTServer(result, processor, this.getClass().getSimpleName(), "GC Monitor
Service", 2, 1000, maxMessageSize);
     } catch (Exception ex) {
       log.fatal(ex, ex);
       throw new RuntimeException(ex);
     }
-    return new InetSocketAddress(Accumulo.getLocalAddress(new String[] {"--address", address}),
port);
+    return result;
   }
   
   /**
@@ -436,7 +409,7 @@ public class SimpleGarbageCollector implements Iface {
   SortedSet<String> getCandidates() throws Exception {
     TreeSet<String> candidates = new TreeSet<String>();
     
-    if (offline) {
+    if (opts.offline) {
       checkForBulkProcessingFiles = true;
       try {
         for (String validExtension : FileOperations.getValidExtensions()) {
@@ -521,7 +494,7 @@ public class SimpleGarbageCollector implements Iface {
   
   private void confirmDeletes(String tableName, SortedSet<String> candidates) throws
AccumuloException {
     Scanner scanner;
-    if (offline) {
+    if (opts.offline) {
       // TODO
       throw new RuntimeException("Offline scanner no longer supported");
       // try {
@@ -634,7 +607,7 @@ public class SimpleGarbageCollector implements Iface {
     // deletes; Need separate writer for the root tablet.
     BatchWriter writer = null;
     BatchWriter rootWriter = null;
-    if (!offline) {
+    if (!opts.offline) {
       Connector c;
       try {
         c = instance.getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java b/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
index 68255b8..3cb73b6 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.accumulo.core.Constants;
@@ -201,8 +202,12 @@ public class LiveTServerSet implements Watcher {
     }
   };
   
-  // Map from tserver master service to server information
+  // The set of active tservers with locks, indexed by their name in zookeeper
   private Map<String,TServerInfo> current = new HashMap<String,TServerInfo>();
+  // as above, indexed by TServerInstance
+  private Map<TServerInstance, TServerInfo> currentInstances = new HashMap<TServerInstance,
TServerInfo>();
+  
+  // The set of entries in zookeeper without locks, and the first time each was noticed
   private Map<String,Long> locklessServers = new HashMap<String,Long>();
   
   public LiveTServerSet(Instance instance, AccumuloConfiguration conf, Listener cback) {
@@ -240,8 +245,8 @@ public class LiveTServerSet implements Watcher {
       
       locklessServers.keySet().retainAll(all);
       
-      for (String server : all) {
-        checkServer(updates, doomed, path, server);
+      for (String zPath : all) {
+        checkServer(updates, doomed, path, zPath);
       }
       
       // log.debug("Current: " + current.keySet());
@@ -262,42 +267,46 @@ public class LiveTServerSet implements Watcher {
     }
   }
   
-  private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance>
doomed, final String path, final String server)
+  private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance>
doomed, final String path, final String zPath)
       throws TException, InterruptedException, KeeperException {
     
-    TServerInfo info = current.get(server);
+    TServerInfo info = current.get(zPath);
     
-    final String lockPath = path + "/" + server;
+    final String lockPath = path + "/" + zPath;
     Stat stat = new Stat();
     byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat);
     
     if (lockData == null) {
       if (info != null) {
         doomed.add(info.instance);
-        current.remove(server);
+        current.remove(zPath);
+        currentInstances.remove(info.instance);
       }
       
-      Long firstSeen = locklessServers.get(server);
+      Long firstSeen = locklessServers.get(zPath);
       if (firstSeen == null) {
-        locklessServers.put(server, System.currentTimeMillis());
-      } else if (System.currentTimeMillis() - firstSeen > 600000) {
-        deleteServerNode(path + "/" + server);
-        locklessServers.remove(server);
+        locklessServers.put(zPath, System.currentTimeMillis());
+      } else if (System.currentTimeMillis() - firstSeen > 10*60*1000) {
+        deleteServerNode(path + "/" + zPath);
+        locklessServers.remove(zPath);
       }
     } else {
-      locklessServers.remove(server);
+      locklessServers.remove(zPath);
       ServerServices services = new ServerServices(new String(lockData));
       InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
-      InetSocketAddress addr = AddressUtil.parseAddress(server);
       TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner());
       
       if (info == null) {
         updates.add(instance);
-        current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
+        TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client));
+        current.put(zPath, tServerInfo);
+        currentInstances.put(instance, tServerInfo);
       } else if (!info.instance.equals(instance)) {
         doomed.add(info.instance);
         updates.add(instance);
-        current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
+        TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client));
+        current.put(zPath, tServerInfo);
+        currentInstances.put(info.instance, tServerInfo);
       }
     }
   }
@@ -339,53 +348,51 @@ public class LiveTServerSet implements Watcher {
   public synchronized TServerConnection getConnection(TServerInstance server) throws TException
{
     if (server == null)
       return null;
-    TServerInfo serverInfo = current.get(server.hostPort());
-    // lock was lost?
-    if (serverInfo == null)
-      return null;
-    // instance changed?
-    if (!serverInfo.instance.equals(server))
+    TServerInfo tServerInfo = currentInstances.get(server);
+    if (tServerInfo == null)
       return null;
-    TServerConnection result = serverInfo.connection;
-    return result;
+    return tServerInfo.connection;
   }
   
   public synchronized Set<TServerInstance> getCurrentServers() {
-    HashSet<TServerInstance> result = new HashSet<TServerInstance>();
-    for (TServerInfo c : current.values()) {
-      result.add(c.instance);
-    }
-    return result;
+    return new HashSet<TServerInstance>(currentInstances.keySet());
   }
   
   public synchronized int size() {
     return current.size();
   }
   
-  public synchronized TServerInstance find(String serverName) {
-    TServerInfo serverInfo = current.get(serverName);
-    if (serverInfo != null) {
-      return serverInfo.instance;
+  public synchronized TServerInstance find(String tabletServer) {
+    InetSocketAddress addr = AddressUtil.parseAddress(tabletServer);
+    for (Entry<String,TServerInfo> entry : current.entrySet()) {
+      if (entry.getValue().instance.getLocation().equals(addr))
+        return entry.getValue().instance;
     }
     return null;
   }
   
-  public synchronized boolean isOnline(String serverName) {
-    return current.containsKey(serverName);
-  }
-  
   public synchronized void remove(TServerInstance server) {
-    current.remove(server.hostPort());
+    String zPath = null;
+    for (Entry<String,TServerInfo> entry : current.entrySet()) {
+      if (entry.getValue().instance.equals(server)) {
+        zPath = entry.getKey();
+        break;
+      }
+    }
+    if (zPath == null)
+      return;
+    current.remove(zPath);
+    currentInstances.remove(server);
     
     log.info("Removing zookeeper lock for " + server);
-    String zpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + server.hostPort();
+    String fullpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + zPath;
     try {
-      ZooReaderWriter.getRetryingInstance().recursiveDelete(zpath, SKIP);
+      ZooReaderWriter.getRetryingInstance().recursiveDelete(fullpath, SKIP);
     } catch (Exception e) {
       String msg = "error removing tablet server lock";
       log.fatal(msg, e);
       Halt.halt(msg, -1);
     }
-    getZooCache().clear(zpath);
+    getZooCache().clear(fullpath);
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/main/java/org/apache/accumulo/server/master/Master.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/Master.java b/server/src/main/java/org/apache/accumulo/server/master/Master.java
index 3d14d12..bb7fa62 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/Master.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/Master.java
@@ -89,6 +89,7 @@ import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.ServerOpts;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
@@ -686,9 +687,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver,
CurrentSt
     public void shutdownTabletServer(TInfo info, TCredentials c, String tabletServer, boolean
force) throws ThriftSecurityException, TException {
       security.canPerformSystemActions(c);
       
-      final InetSocketAddress addr = AddressUtil.parseAddress(tabletServer);
-      final String addrString = org.apache.accumulo.core.util.AddressUtil.toString(addr);
-      final TServerInstance doomed = tserverSet.find(addrString);
+      final TServerInstance doomed = tserverSet.find(tabletServer);
       if (!force) {
         final TServerConnection server = tserverSet.getConnection(doomed);
         if (server == null) {
@@ -1512,7 +1511,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver,
CurrentSt
     }
     
     Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(new MasterClientServiceHandler()));
-    clientService = TServerUtils.startServer(getSystemConfiguration(), Property.MASTER_CLIENTPORT,
processor, "Master", "Master Client Service Handler", null,
+    clientService = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT,
processor, "Master", "Master Client Service Handler", null,
         Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE).server;
     
     while (!clientService.isServing()) {
@@ -1597,8 +1596,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver,
CurrentSt
   private void getMasterLock(final String zMasterLoc) throws KeeperException, InterruptedException
{
     log.info("trying to get master lock");
     
-    final String masterClientAddress = org.apache.accumulo.core.util.AddressUtil.toString(new
InetSocketAddress(hostname, getSystemConfiguration().getPort(
-        Property.MASTER_CLIENTPORT)));
+    final String masterClientAddress = hostname + ":" + getSystemConfiguration().getPort(Property.MASTER_CLIENTPORT);
     
     while (true) {
       
@@ -1629,7 +1627,9 @@ public class Master implements LiveTServerSet.Listener, TableObserver,
CurrentSt
       SecurityUtil.serverLogin();
       
       VolumeManager fs = VolumeManagerImpl.get();
-      String hostname = Accumulo.getLocalAddress(args);
+      ServerOpts opts = new ServerOpts();
+      opts.parseArgs("master", args);
+      String hostname = opts.getAddress();
       Instance instance = HdfsZooInstance.getInstance();
       ServerConfiguration conf = new ServerConfiguration(instance);
       Accumulo.init(fs, conf, "master");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java b/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
index 5957f26..2632b4e 100644
--- a/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
+++ b/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
@@ -49,6 +49,7 @@ import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.ServerOpts;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -450,7 +451,9 @@ public class Monitor {
     SecurityUtil.serverLogin();
     
     VolumeManager fs = VolumeManagerImpl.get();
-    String hostname = Accumulo.getLocalAddress(args);
+    ServerOpts opts = new ServerOpts();
+    opts.parseArgs("monitor", args);
+    String hostname = opts.getAddress();
     instance = HdfsZooInstance.getInstance();
     config = new ServerConfiguration(instance);
     Accumulo.init(fs, config, "monitor");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 9824c64..7425fed 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -117,7 +117,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
 import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
-import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.ByteBufferUtil;
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.Daemon;
@@ -136,6 +135,7 @@ import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.ServerOpts;
 import org.apache.accumulo.server.client.ClientServiceHandler;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
@@ -187,7 +187,7 @@ import org.apache.accumulo.server.util.MapCounter;
 import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.accumulo.server.util.MetadataTableUtil.LogEntry;
 import org.apache.accumulo.server.util.TServerUtils;
-import org.apache.accumulo.server.util.TServerUtils.ServerPort;
+import org.apache.accumulo.server.util.TServerUtils.ServerAddress;
 import org.apache.accumulo.server.util.time.RelativeTime;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
@@ -2611,11 +2611,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     MetadataTableUtil.addLogEntry(SystemCredentials.get().getAsThrift(), entry, getLock());
   }
   
-  private int startServer(AccumuloConfiguration conf, Property portHint, TProcessor processor,
String threadName) throws UnknownHostException {
-    ServerPort sp = TServerUtils.startServer(conf, portHint, processor, this.getClass().getSimpleName(),
threadName, Property.TSERV_PORTSEARCH,
+  private int startServer(AccumuloConfiguration conf, String address, Property portHint,
TProcessor processor, String threadName) throws UnknownHostException {
+    ServerAddress sp = TServerUtils.startServer(conf, address, portHint, processor, this.getClass().getSimpleName(),
threadName, Property.TSERV_PORTSEARCH,
         Property.TSERV_MINTHREADS, Property.TSERV_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
     this.server = sp.server;
-    return sp.port;
+    return sp.address.getPort();
   }
   
   private String getMasterAddress() {
@@ -2655,7 +2655,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     // start listening for client connection last
     Iface tch = TraceWrap.service(new ThriftClientHandler());
     Processor<Iface> processor = new Processor<Iface>(tch);
-    int port = startServer(getSystemConfiguration(), Property.TSERV_CLIENTPORT, processor,
"Thrift Client Server");
+    int port = startServer(getSystemConfiguration(), clientAddress.getHostName(), Property.TSERV_CLIENTPORT,
processor, "Thrift Client Server");
     log.info("port = " + port);
     return port;
   }
@@ -2733,7 +2733,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     if (clientPort == 0) {
       throw new RuntimeException("Failed to start the tablet client service");
     }
-    clientAddress = new InetSocketAddress(clientAddress.getAddress(), clientPort);
+    clientAddress = new InetSocketAddress(clientAddress.getHostName(), clientPort);
     announceExistence();
     
     ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS),
"distributed work queue");
@@ -3012,7 +3012,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
   public String getClientAddressString() {
     if (clientAddress == null)
       return null;
-    return AddressUtil.toString(clientAddress);
+    return clientAddress.getHostName() + ":" + clientAddress.getPort();
   }
   
   TServerInstance getTabletSession() {
@@ -3213,7 +3213,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     try {
       SecurityUtil.serverLogin();
       VolumeManager fs = VolumeManagerImpl.get();
-      String hostname = Accumulo.getLocalAddress(args);
+      ServerOpts opts = new ServerOpts();
+      opts.parseArgs("tserver", args);
+      String hostname = opts.getAddress();
       Instance instance = HdfsZooInstance.getInstance();
       ServerConfiguration conf = new ServerConfiguration(instance);
       Accumulo.init(fs, conf, "tserver");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java b/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
index 67a55fa..5875182 100644
--- a/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
@@ -45,6 +45,7 @@ import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.ServerOpts;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -256,11 +257,13 @@ public class TraceServer implements Watcher {
   
   public static void main(String[] args) throws Exception {
     SecurityUtil.serverLogin();
+    ServerOpts opts = new ServerOpts();
+    opts.parseArgs("tracer", args);
     Instance instance = HdfsZooInstance.getInstance();
     ServerConfiguration conf = new ServerConfiguration(instance);
     VolumeManager fs = VolumeManagerImpl.get();
     Accumulo.init(fs, conf, "tracer");
-    String hostname = Accumulo.getLocalAddress(args);
+    String hostname = opts.getAddress();
     TraceServer server = new TraceServer(conf, hostname);
     Accumulo.enableTracing(hostname, "tserver");
     server.run();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
index 0c751f5..926e370 100644
--- a/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@ -55,13 +55,13 @@ public class TServerUtils {
   
   public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
   
-  public static class ServerPort {
+  public static class ServerAddress {
     public final TServer server;
-    public final int port;
+    public final InetSocketAddress address;
     
-    public ServerPort(TServer server, int port) {
+    public ServerAddress(TServer server, InetSocketAddress address) {
       this.server = server;
-      this.port = port;
+      this.address = address;
     }
   }
   
@@ -83,7 +83,7 @@ public class TServerUtils {
    * @throws UnknownHostException
    *           when we don't know our own address
    */
-  public static ServerPort startServer(AccumuloConfiguration conf, Property portHintProperty,
TProcessor processor, String serverName, String threadName,
+  public static ServerAddress startServer(AccumuloConfiguration conf, String address, Property
portHintProperty, TProcessor processor, String serverName, String threadName,
       Property portSearchProperty,
       Property minThreadProperty, 
       Property timeBetweenThreadChecksProperty, 
@@ -118,7 +118,8 @@ public class TServerUtils {
         if (port > 65535)
           port = 1024 + port % (65535 - 1024);
         try {
-          return TServerUtils.startTServer(port, timedProcessor, serverName, threadName,
minThreads, timeBetweenThreadChecks, maxMessageSize);
+          InetSocketAddress addr = new InetSocketAddress(address, port);
+          return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName,
minThreads, timeBetweenThreadChecks, maxMessageSize);
         } catch (Exception ex) {
           log.info("Unable to use port " + port + ", retrying. (Thread Name = " + threadName
+ ")");
           UtilWaitThread.sleep(250);
@@ -212,9 +213,9 @@ public class TServerUtils {
     }
   }
   
-  public static ServerPort startHsHaServer(int port, TProcessor processor, final String serverName,
String threadName, final int numThreads,
+  public static ServerAddress startHsHaServer(InetSocketAddress address, TProcessor processor,
final String serverName, String threadName, final int numThreads,
       long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
-    TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
+    TNonblockingServerSocket transport = new TNonblockingServerSocket(address);
     THsHaServer.Args options = new THsHaServer.Args(transport);
     options.protocolFactory(ThriftUtil.protocolFactory());
     options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
@@ -249,10 +250,10 @@ public class TServerUtils {
     }, timeBetweenThreadChecks, timeBetweenThreadChecks);
     options.executorService(pool);
     options.processorFactory(new TProcessorFactory(processor));
-    return new ServerPort(new THsHaServer(options), port);
+    return new ServerAddress(new THsHaServer(options), address);
   }
   
-  public static ServerPort startThreadPoolServer(int port, TProcessor processor, String serverName,
String threadName, int numThreads)
+  public static ServerAddress startThreadPoolServer(InetSocketAddress address, TProcessor
processor, String serverName, String threadName, int numThreads)
       throws TTransportException {
     
     // if port is zero, then we must bind to get the port number
@@ -260,8 +261,8 @@ public class TServerUtils {
     try {
       sock = ServerSocketChannel.open().socket();
       sock.setReuseAddress(true);
-      sock.bind(new InetSocketAddress(port));
-      port = sock.getLocalPort();
+      sock.bind(address);
+      address = new InetSocketAddress(address.getHostName(), sock.getLocalPort());
     } catch (IOException ex) {
       throw new TTransportException(ex);
     }
@@ -270,17 +271,17 @@ public class TServerUtils {
     options.protocolFactory(ThriftUtil.protocolFactory());
     options.transportFactory(ThriftUtil.transportFactory());
     options.processorFactory(new ClientInfoProcessorFactory(processor));
-    return new ServerPort(new TThreadPoolServer(options), port);
+    return new ServerAddress(new TThreadPoolServer(options), address);
   }
   
-  public static ServerPort startTServer(int port, TProcessor processor, String serverName,
String threadName, int numThreads, long timeBetweenThreadChecks, long maxMessageSize)
+  public static ServerAddress startTServer(InetSocketAddress address, TProcessor processor,
String serverName, String threadName, int numThreads, long timeBetweenThreadChecks, long maxMessageSize)
       throws TTransportException {
-    return startTServer(port, new TimedProcessor(processor, serverName, threadName), serverName,
threadName, numThreads, timeBetweenThreadChecks, maxMessageSize);
+    return startTServer(address, new TimedProcessor(processor, serverName, threadName), serverName,
threadName, numThreads, timeBetweenThreadChecks, maxMessageSize);
   }
   
-  public static ServerPort startTServer(int port, TimedProcessor processor, String serverName,
String threadName, int numThreads, long timeBetweenThreadChecks, long maxMessageSize)
+  public static ServerAddress startTServer(InetSocketAddress address, TimedProcessor processor,
String serverName, String threadName, int numThreads, long timeBetweenThreadChecks, long maxMessageSize)
       throws TTransportException {
-    ServerPort result = startHsHaServer(port, processor, serverName, threadName, numThreads,
timeBetweenThreadChecks, maxMessageSize);
+    ServerAddress result = startHsHaServer(address, processor, serverName, threadName, numThreads,
timeBetweenThreadChecks, maxMessageSize);
     // ServerPort result = startThreadPoolServer(port, processor, serverName, threadName,
-1);
     final TServer finalServer = result.server;
     Runnable serveTask = new Runnable() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java b/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
index 9ac0b50..28d727d 100644
--- a/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
+++ b/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.security.CredentialHelper;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.gc.SimpleGarbageCollector.Opts;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;
 import org.junit.Test;
@@ -98,7 +99,7 @@ public class TestConfirmDeletes {
     
     load(instance, metadata, deletes);
     
-    SimpleGarbageCollector gc = new SimpleGarbageCollector();
+    SimpleGarbageCollector gc = new SimpleGarbageCollector(new Opts());
     gc.init(fs, instance, auth, false);
     SortedSet<String> candidates = gc.getCandidates();
     Assert.assertEquals(expectedInitial, candidates.size());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index 2afca25..ab40304 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.accumulo.test.functional;
 
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Random;
@@ -39,7 +38,7 @@ import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.util.TServerUtils;
-import org.apache.accumulo.server.util.TServerUtils.ServerPort;
+import org.apache.accumulo.server.util.TServerUtils.ServerAddress;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher;
 import org.apache.accumulo.server.zookeeper.ZooLock;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -99,10 +98,9 @@ public class ZombieTServer {
     TransactionWatcher watcher = new TransactionWatcher();
     final ThriftClientHandler tch = new ThriftClientHandler(instance, watcher);
     Processor<Iface> processor = new Processor<Iface>(tch);
-    ServerPort serverPort = TServerUtils.startTServer(port, processor, "ZombieTServer", "walking
dead", 2, 1000, 10*1024*1024);
+    ServerAddress serverPort = TServerUtils.startTServer(new InetSocketAddress(port), processor,
"ZombieTServer", "walking dead", 2, 1000, 10*1024*1024);
     
-    InetSocketAddress addr = new InetSocketAddress(InetAddress.getLocalHost(), serverPort.port);
-    String addressString = AddressUtil.toString(addr);
+    String addressString = AddressUtil.toString(serverPort.address);
     String zPath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + addressString;
     ZooReaderWriter zoo = ZooReaderWriter.getInstance();
     zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index 41a4d54..e33603f 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -220,7 +220,7 @@ public class NullTserver {
     TransactionWatcher watcher = new TransactionWatcher();
     ThriftClientHandler tch = new ThriftClientHandler(HdfsZooInstance.getInstance(), watcher);
     Processor<Iface> processor = new Processor<Iface>(tch);
-    TServerUtils.startTServer(opts.port, processor, "NullTServer", "null tserver", 2, 1000,
10 * 1024 * 1024);
+    TServerUtils.startTServer(new InetSocketAddress(opts.port), processor, "NullTServer",
"null tserver", 2, 1000, 10 * 1024 * 1024);
     
     InetSocketAddress addr = new InetSocketAddress(InetAddress.getLocalHost(), opts.port);
     

http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
index 37c7e43..b3d6479 100644
--- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
@@ -627,6 +627,7 @@ public class ShellServerIT {
     exec("insert \\x02 cf cq value", true);
     exec("scan -b 02", true, "value", false);
     exec("interpreter -i org.apache.accumulo.core.util.interpret.HexScanInterpreter", true);
+    UtilWaitThread.sleep(500);
     exec("interpreter -l", true, "HexScan", true);
     exec("scan -b 02", true, "value", true);
     exec("deletetable -f t", true);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/test/src/test/java/org/apache/accumulo/test/functional/MacTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/MacTest.java b/test/src/test/java/org/apache/accumulo/test/functional/MacTest.java
index 5fe60e2..19dfa6b 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/MacTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/MacTest.java
@@ -22,7 +22,6 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
-import org.apache.accumulo.minicluster.MiniAccumuloCluster.LogWriter;
 import org.apache.accumulo.minicluster.MiniAccumuloConfig;
 import org.apache.log4j.Logger;
 import org.junit.After;
@@ -58,8 +57,6 @@ public class MacTest {
   public void tearDown() throws Exception {
     if (cluster != null)
       cluster.stop();
-    for (LogWriter log : cluster.getLogWriters())
-      log.flush();
     folder.delete();
   }
   


Mime
View raw message