accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cjno...@apache.org
Subject svn commit: r1483437 - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apache/accumulo/core/util/ core/src/test/java/org/apache/accumulo/core/...
Date Thu, 16 May 2013 16:42:30 GMT
Author: cjnolet
Date: Thu May 16 16:42:29 2013
New Revision: 1483437

URL: http://svn.apache.org/r1483437
Log:
ACCUMULO-1393 Removed all code that assumes TSERVER/MASTER don't have port attached to address
in metadata table and zookeeper
ACCUMULO-1287 Add "Master" column on Master Server monitor page to the legend

Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperationsImpl.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ServerServices.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/util/AddressUtilTest.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Admin.java
    accumulo/trunk/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperationsImpl.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperationsImpl.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperationsImpl.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperationsImpl.java
Thu May 16 16:42:29 2013
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.master.t
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
+import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
@@ -224,7 +225,7 @@ public class InstanceOperationsImpl impl
   public void ping(String tserver) throws AccumuloException {
     TTransport transport = null;
     try {
-      transport = ThriftUtil.createTransport(tserver, instance.getConfiguration().getPort(Property.TSERV_CLIENTPORT),
instance.getConfiguration());
+      transport = ThriftUtil.createTransport(AddressUtil.parseAddress(tserver), instance.getConfiguration());
       TabletClientService.Client client = ThriftUtil.createClient(new TabletClientService.Client.Factory(),
transport);
       client.getTabletServerStatus(Tracer.traceInfo(), credentials);
     } catch (TTransportException e) {

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
Thu May 16 16:42:29 2013
@@ -23,7 +23,6 @@ import org.apache.accumulo.core.client.A
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.accumulo.core.util.ThriftUtil;
@@ -57,19 +56,16 @@ public class MasterClient {
     }
     
     String master = locations.get(0);
-    int portHint = instance.getConfiguration().getPort(Property.MASTER_CLIENTPORT);
-    
     try {
       // Master requests can take a long time: don't ever time out
-      MasterClientService.Client client = ThriftUtil.getClient(new MasterClientService.Client.Factory(),
master, Property.MASTER_CLIENTPORT,
-          instance.getConfiguration());
+      MasterClientService.Client client = ThriftUtil.getClientNoTimeout(new MasterClientService.Client.Factory(),
master);
       return client;
     } catch (TTransportException tte) {
       if (tte.getCause().getClass().equals(UnknownHostException.class)) {
         // do not expect to recover from this
         throw new RuntimeException(tte);
       }
-      log.debug("Failed to connect to master=" + master + " portHint=" + portHint + ", will
retry... ", tte);
+      log.debug("Failed to connect to master=" + master + ", will retry... ", tte);
       return null;
     }
   }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
Thu May 16 16:42:29 2013
@@ -134,17 +134,14 @@ public class ServerClient {
     ArrayList<ThriftTransportKey> servers = new ArrayList<ThriftTransportKey>();
     
     // add tservers
-    
     ZooCache zc = getZooCache(instance);
-    AccumuloConfiguration conf = instance.getConfiguration();
     for (String tserver : zc.getChildren(ZooUtil.getRoot(instance) + Constants.ZTSERVERS))
{
       String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + tserver;
       byte[] data = ZooUtil.getLockData(zc, path);
       if (data != null && !new String(data).equals("master"))
         servers.add(new ThriftTransportKey(
-            new ServerServices(new String(data)).getAddressString(Service.TSERV_CLIENT),

-            conf.getPort(Property.TSERV_CLIENTPORT), 
-            rpcTimeout));
+          new ServerServices(new String(data)).getAddressString(Service.TSERV_CLIENT),
+          rpcTimeout));
     }
     
     boolean opened = false;

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
Thu May 16 16:42:29 2013
@@ -633,7 +633,7 @@ public class TabletServerBatchReaderIter
     try {
       TabletClientService.Client client;
       if (timeoutTracker.getTimeOut() < conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
-        client = ThriftUtil.getTServerClient(server, conf, timeoutTracker.getTimeOut());
+        client = ThriftUtil.getTServerClient(server, timeoutTracker.getTimeOut());
       else
         client = ThriftUtil.getTServerClient(server, conf);
       

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
Thu May 16 16:42:29 2013
@@ -544,7 +544,7 @@ public class TabletServerBatchWriter {
   /**
    * Add mutations that previously failed back into the mix
    * 
-   * @param mutationsprivate
+   * @param failedMutations
    *          static final Logger log = Logger.getLogger(TabletServerBatchWriter.class);
    */
   private synchronized void addFailedMutations(MutationSet failedMutations) throws Exception
{
@@ -856,7 +856,7 @@ public class TabletServerBatchWriter {
         TabletClientService.Iface client;
         
         if (timeoutTracker.getTimeOut() < instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
-          client = ThriftUtil.getTServerClient(location, instance.getConfiguration(), timeoutTracker.getTimeOut());
+          client = ThriftUtil.getTServerClient(location, timeoutTracker.getTimeOut());
         else
           client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
         

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
Thu May 16 16:42:29 2013
@@ -25,10 +25,16 @@ class ThriftTransportKey {
   
   private int hash = -1;
   
-  ThriftTransportKey(String location, int port, long timeout) {
+  ThriftTransportKey(String location, long timeout) {
+    
     ArgumentChecker.notNull(location);
-    this.location = location;
-    this.port = port;
+    String[] locationAndPort = location.split(":", 2);
+    if (locationAndPort.length == 2) {
+      this.location = locationAndPort[0];
+      this.port = Integer.parseInt(locationAndPort[1]);
+    } else
+      throw new IllegalArgumentException("Location was expected to contain port but did not.
location=" + location);
+    
     this.timeout = timeout;
   }
   

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
Thu May 16 16:42:29 2013
@@ -358,20 +358,35 @@ public class ThriftTransportPool {
   
   private ThriftTransportPool() {}
   
-  public TTransport getTransport(String location, int port) throws TTransportException {
-    return getTransport(location, port, 0);
-  }
-  
   public TTransport getTransportWithDefaultTimeout(InetSocketAddress addr, AccumuloConfiguration
conf) throws TTransportException {
-    return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+    return getTransport(String.format("%s:%d", addr.getAddress().getHostAddress(), addr.getPort()),
conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
   }
   
-  public TTransport getTransport(InetSocketAddress addr, long timeout) throws TTransportException
{
-    return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), timeout);
+  public TTransport getTransport(String location, long milliseconds) throws TTransportException
{
+    return getTransport(new ThriftTransportKey(location, milliseconds));
   }
   
-  public TTransport getTransportWithDefaultTimeout(String location, int port, AccumuloConfiguration
conf) throws TTransportException {
-    return getTransport(location, port, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+  private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException
{
+    synchronized (this) {
+      // atomically reserve location if it exist in cache
+      List<CachedConnection> ccl = cache.get(cacheKey);
+      
+      if (ccl == null) {
+        ccl = new LinkedList<CachedConnection>();
+        cache.put(cacheKey, ccl);
+      }
+      
+      for (CachedConnection cachedConnection : ccl) {
+        if (!cachedConnection.isReserved()) {
+          cachedConnection.setReserved(true);
+          if (log.isTraceEnabled())
+            log.trace("Using existing connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
+          return cachedConnection.transport;
+        }
+      }
+    }
+    
+    return createNewTransport(cacheKey);
   }
   
   Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean
preferCachedConnection) throws TTransportException {
@@ -424,7 +439,7 @@ public class ThriftTransportPool {
           }
         }
       }
-
+      
       try {
         return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(),
createNewTransport(ttk));
       } catch (TTransportException tte) {
@@ -437,33 +452,6 @@ public class ThriftTransportPool {
     throw new TTransportException("Failed to connect to a server");
   }
   
-  public TTransport getTransport(String location, int port, long milliseconds) throws TTransportException
{
-    return getTransport(new ThriftTransportKey(location, port, milliseconds));
-  }
-  
-  private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException
{
-    synchronized (this) {
-      // atomically reserve location if it exist in cache
-      List<CachedConnection> ccl = cache.get(cacheKey);
-      
-      if (ccl == null) {
-        ccl = new LinkedList<CachedConnection>();
-        cache.put(cacheKey, ccl);
-      }
-      
-      for (CachedConnection cachedConnection : ccl) {
-        if (!cachedConnection.isReserved()) {
-          cachedConnection.setReserved(true);
-          if (log.isTraceEnabled())
-            log.trace("Using existing connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
-          return cachedConnection.transport;
-        }
-      }
-    }
-    
-    return createNewTransport(cacheKey);
-  }
-  
   private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException
{
     TTransport transport;
     if (cacheKey.getTimeout() == 0) {
@@ -508,7 +496,7 @@ public class ThriftTransportPool {
     CachedTTransport ctsc = (CachedTTransport) tsc;
     
     ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>();
-
+    
     synchronized (this) {
       List<CachedConnection> ccl = cache.get(ctsc.getCacheKey());
       for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();)
{
@@ -589,7 +577,7 @@ public class ThriftTransportPool {
     this.killTime = time;
     log.debug("Set thrift transport pool idle time to " + time);
   }
-
+  
   private static ThriftTransportPool instance = new ThriftTransportPool();
   private static final AtomicBoolean daemonStarted = new AtomicBoolean(false);
   

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java Thu May
16 16:42:29 2013
@@ -23,9 +23,7 @@ import org.apache.thrift.transport.TSock
 
 public class AddressUtil {
   static public InetSocketAddress parseAddress(String address, int defaultPort) throws NumberFormatException
{
-    String[] parts = address.split(":", 2);
-    if (address.contains("+"))
-      parts = address.split("\\+", 2);
+    String[] parts = extractPartsFromAddress(address);
     if (parts.length == 2) {
       if (parts[1].isEmpty())
         return new InetSocketAddress(parts[0], defaultPort);
@@ -33,7 +31,15 @@ public class AddressUtil {
     }
     return new InetSocketAddress(address, defaultPort);
   }
-  
+
+  static public InetSocketAddress parseAddress(String address) throws NumberFormatException
{
+    String[] parts = extractPartsFromAddress(address);
+    if (parts.length == 2)
+      return new InetSocketAddress(parts[0], Integer.parseInt(parts[1]));
+    else
+      throw new IllegalArgumentException("Address was expected to contain port. address="
+ address);
+  }
+
   static public InetSocketAddress parseAddress(Text address, int defaultPort) {
     return parseAddress(address.toString(), defaultPort);
   }
@@ -46,5 +52,12 @@ public class AddressUtil {
   static public String toString(InetSocketAddress addr) {
     return addr.getAddress().getHostAddress() + ":" + addr.getPort();
   }
-  
+
+  static private String[] extractPartsFromAddress(String address) {
+    String[] parts = address.split(":", 2);
+    if (address.contains("+"))
+      parts = address.split("\\+", 2);
+
+    return parts;
+  }
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ServerServices.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ServerServices.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ServerServices.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ServerServices.java Thu
May 16 16:42:29 2013
@@ -24,56 +24,42 @@ import org.apache.accumulo.core.conf.Pro
 
 public class ServerServices implements Comparable<ServerServices> {
   public static enum Service {
-    TSERV_CLIENT, MASTER_CLIENT, GC_CLIENT;
-    
-    // not necessary: everything should be advertizing ports in zookeeper
-    int getDefaultPort() {
-      switch (this) {
-        case TSERV_CLIENT:
-          return AccumuloConfiguration.getDefaultConfiguration().getPort(Property.TSERV_CLIENTPORT);
-        case MASTER_CLIENT:
-          return AccumuloConfiguration.getDefaultConfiguration().getPort(Property.MASTER_CLIENTPORT);
-        case GC_CLIENT:
-          return AccumuloConfiguration.getDefaultConfiguration().getPort(Property.GC_PORT);
-        default:
-          throw new IllegalArgumentException();
-      }
-    }
+    TSERV_CLIENT, GC_CLIENT;
   }
-  
+
   public static final String SERVICE_SEPARATOR = ";";
   public static final String SEPARATOR_CHAR = "=";
-  
+
   private EnumMap<Service,String> services;
   private String stringForm = null;
-  
+
   public ServerServices(String services) {
     this.services = new EnumMap<Service,String>(Service.class);
-    
+
     String[] addresses = services.split(SERVICE_SEPARATOR);
     for (String address : addresses) {
       String[] sa = address.split(SEPARATOR_CHAR, 2);
       this.services.put(Service.valueOf(sa[0]), sa[1]);
     }
   }
-  
+
   public ServerServices(String address, Service service) {
     this(service.name() + SEPARATOR_CHAR + address);
   }
-  
+
   public String getAddressString(Service service) {
     return services.get(service);
   }
-  
+
   public InetSocketAddress getAddress(Service service) {
     String address = getAddressString(service);
     String[] parts = address.split(":", 2);
     if (parts.length == 2) {
       if (parts[1].isEmpty())
-        return new InetSocketAddress(parts[0], service.getDefaultPort());
+        throw new IllegalArgumentException("Address was expected to have port but didn't.
address=" + address);
       return new InetSocketAddress(parts[0], Integer.parseInt(parts[1]));
     }
-    return new InetSocketAddress(address, service.getDefaultPort());
+    throw new IllegalArgumentException("Address was expected to have port but didn't. address="
+ address);
   }
   
   // DON'T CHANGE THIS; WE'RE USING IT FOR SERIALIZATION!!!
@@ -81,7 +67,7 @@ public class ServerServices implements C
     if (stringForm == null) {
       StringBuilder sb = new StringBuilder();
       String prefix = "";
-      for (Service service : new Service[] {Service.MASTER_CLIENT, Service.TSERV_CLIENT,
Service.GC_CLIENT}) {
+      for (Service service : new Service[] { Service.TSERV_CLIENT, Service.GC_CLIENT}) {
         if (services.containsKey(service)) {
           sb.append(prefix).append(service.name()).append(SEPARATOR_CHAR).append(services.get(service));
           prefix = SERVICE_SEPARATOR;

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java Thu May
16 16:42:29 2013
@@ -45,18 +45,17 @@ import org.apache.thrift.transport.TTran
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 
-
 public class ThriftUtil {
   private static final Logger log = Logger.getLogger(ThriftUtil.class);
-
+  
   public static class TraceProtocol extends TCompactProtocol {
-
+    
     @Override
     public void writeMessageBegin(TMessage message) throws TException {
       Trace.start("client:" + message.name);
       super.writeMessageBegin(message);
     }
-
+    
     @Override
     public void writeMessageEnd() throws TException {
       super.writeMessageEnd();
@@ -64,7 +63,7 @@ public class ThriftUtil {
       if (currentTrace != null)
         currentTrace.stop();
     }
-
+    
     public TraceProtocol(TTransport transport) {
       super(transport);
     }
@@ -72,7 +71,7 @@ public class ThriftUtil {
   
   public static class TraceProtocolFactory extends TCompactProtocol.Factory {
     private static final long serialVersionUID = 1L;
-
+    
     @Override
     public TProtocol getProtocol(TTransport trans) {
       return new TraceProtocol(trans);
@@ -91,22 +90,19 @@ public class ThriftUtil {
     return createClient(factory, ThriftTransportPool.getInstance().getTransportWithDefaultTimeout(address,
conf));
   }
   
+  static public <T extends TServiceClient> T getClientNoTimeout(TServiceClientFactory<T>
factory, String address) throws TTransportException {
+    return createClient(factory, ThriftTransportPool.getInstance().getTransport(address,
0));
+  }
+  
   static public <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, String address, Property property, AccumuloConfiguration configuration)
       throws TTransportException {
-    int port = configuration.getPort(property);
-    TTransport transport = ThriftTransportPool.getInstance().getTransport(address, port);
+    long timeout = configuration.getTimeInMillis(property);
+    TTransport transport = ThriftTransportPool.getInstance().getTransport(address, timeout);
     return createClient(factory, transport);
   }
   
-  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, String address, Property property, Property timeoutProperty,
-      AccumuloConfiguration configuration) throws TTransportException {
-    return getClient(factory, address, property, configuration.getTimeInMillis(timeoutProperty),
configuration);
-  }
-  
-  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, String address, Property property, long timeout,
-      AccumuloConfiguration configuration) throws TTransportException {
-    int port = configuration.getPort(property);
-    TTransport transport = ThriftTransportPool.getInstance().getTransport(address, port,
timeout);
+  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, String address, long timeout) throws TTransportException {
+    TTransport transport = ThriftTransportPool.getInstance().getTransport(address, timeout);
     return createClient(factory, transport);
   }
   
@@ -117,13 +113,13 @@ public class ThriftUtil {
   }
   
   static public TabletClientService.Client getTServerClient(String address, AccumuloConfiguration
conf) throws TTransportException {
-    return getClient(new TabletClientService.Client.Factory(), address, Property.TSERV_CLIENTPORT,
Property.GENERAL_RPC_TIMEOUT, conf);
+    return getClient(new TabletClientService.Client.Factory(), address, Property.GENERAL_RPC_TIMEOUT,
conf);
   }
   
-  static public TabletClientService.Client getTServerClient(String address, AccumuloConfiguration
conf, long timeout) throws TTransportException {
-    return getClient(new TabletClientService.Client.Factory(), address, Property.TSERV_CLIENTPORT,
timeout, conf);
+  static public TabletClientService.Client getTServerClient(String address, long timeout)
throws TTransportException {
+    return getClient(new TabletClientService.Client.Factory(), address, timeout);
   }
-
+  
   public static void execute(String address, AccumuloConfiguration conf, ClientExec<TabletClientService.Client>
exec) throws AccumuloException,
       AccumuloSecurityException {
     while (true) {
@@ -168,12 +164,11 @@ public class ThriftUtil {
   /**
    * create a transport that is not pooled
    */
-  public static TTransport createTransport(String address, int port, AccumuloConfiguration
conf) throws TException {
+  public static TTransport createTransport(InetSocketAddress address, AccumuloConfiguration
conf) throws TException {
     TTransport transport = null;
     
     try {
-      transport = TTimeoutTransport.create(org.apache.accumulo.core.util.AddressUtil.parseAddress(address,
port),
-          conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+      transport = TTimeoutTransport.create(address, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
       transport = ThriftUtil.transportFactory().getTransport(transport);
       transport.open();
       TTransport tmp = transport;
@@ -185,15 +180,6 @@ public class ThriftUtil {
       if (transport != null)
         transport.close();
     }
-    
-
-  }
-
-  /**
-   * create a transport that is not pooled
-   */
-  public static TTransport createTransport(InetSocketAddress address, AccumuloConfiguration
conf) throws TException {
-    return createTransport(address.getAddress().getHostAddress(), address.getPort(), conf);
   }
 
   public static TTransportFactory transportFactory() {
@@ -201,22 +187,22 @@ public class ThriftUtil {
   }
   
   private final static Map<Integer,TTransportFactory> factoryCache = new HashMap<Integer,TTransportFactory>();
+  
   synchronized public static TTransportFactory transportFactory(int maxFrameSize) {
     TTransportFactory factory = factoryCache.get(maxFrameSize);
-    if(factory == null)
-    {
+    if (factory == null) {
       factory = new TFramedTransport.Factory(maxFrameSize);
-      factoryCache.put(maxFrameSize,factory);
+      factoryCache.put(maxFrameSize, factory);
     }
     return factory;
   }
-
+  
   synchronized public static TTransportFactory transportFactory(long maxFrameSize) {
-    if(maxFrameSize > Integer.MAX_VALUE || maxFrameSize < 1)
-      throw new RuntimeException("Thrift transport frames are limited to "+Integer.MAX_VALUE);
-    return transportFactory((int)maxFrameSize);
+    if (maxFrameSize > Integer.MAX_VALUE || maxFrameSize < 1)
+      throw new RuntimeException("Thrift transport frames are limited to " + Integer.MAX_VALUE);
+    return transportFactory((int) maxFrameSize);
   }
-
+  
   public static TProtocolFactory protocolFactory() {
     return protocolFactory;
   }

Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/util/AddressUtilTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/util/AddressUtilTest.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/util/AddressUtilTest.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/util/AddressUtilTest.java Thu
May 16 16:42:29 2013
@@ -35,6 +35,8 @@ public class AddressUtilTest extends Tes
     assertTrue(addr2.equals(new InetSocketAddress("127.0.0.1", 1234)));
     InetSocketAddress addr3 = AddressUtil.parseAddress("127.0.0.1:", 12345);
     assertTrue(addr3.equals(new InetSocketAddress("127.0.0.1", 12345)));
+    InetSocketAddress addr4 = AddressUtil.parseAddress("127.0.0.1+456");
+    assertTrue(addr4.equals(new InetSocketAddress("127.0.0.1", 456)));
     try {
       AddressUtil.parseAddress("127.0.0.1:junk", 12345);
       fail("Number Format Exception Not Thrown");

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
Thu May 16 16:42:29 2013
@@ -582,7 +582,7 @@ public class BulkImporter {
       AccumuloSecurityException {
     try {
       long timeInMillis = instance.getConfiguration().getTimeInMillis(Property.TSERV_BULK_TIMEOUT);
-      TabletClientService.Iface client = ThriftUtil.getTServerClient(location, instance.getConfiguration(),
timeInMillis);
+      TabletClientService.Iface client = ThriftUtil.getTServerClient(location, timeInMillis);
       try {
         HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>
files = new HashMap<KeyExtent,Map<String,org.apache.accumulo.core.data.thrift.MapFileInfo>>();
         for (Entry<KeyExtent,List<PathSize>> entry : assignmentsPerTablet.entrySet())
{

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
Thu May 16 16:42:29 2013
@@ -151,7 +151,7 @@ public class GarbageCollectWriteAheadLog
           }
         }
       } else {
-        InetSocketAddress address = AddressUtil.parseAddress(entry.getKey(), Property.TSERV_CLIENTPORT);
+        InetSocketAddress address = AddressUtil.parseAddress(entry.getKey());
         if (!holdsLock(address)) {
           Path serverPath = new Path(Constants.getWalDirectory(conf), entry.getKey());
           for (String filename : entry.getValue()) {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
Thu May 16 16:42:29 2013
@@ -289,7 +289,7 @@ public class LiveTServerSet implements W
       locklessServers.remove(server);
       ServerServices services = new ServerServices(new String(lockData));
       InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
-      InetSocketAddress addr = AddressUtil.parseAddress(server, Property.TSERV_CLIENTPORT);
+      InetSocketAddress addr = AddressUtil.parseAddress(server);
       TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner());
       
       if (info == null) {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java Thu
May 16 16:42:29 2013
@@ -699,7 +699,7 @@ public class Master implements LiveTServ
     public void shutdownTabletServer(TInfo info, TCredentials c, String tabletServer, boolean
force) throws ThriftSecurityException, TException {
       security.canPerformSystemActions(c);
       
-      final InetSocketAddress addr = AddressUtil.parseAddress(tabletServer, Property.TSERV_CLIENTPORT);
+      final InetSocketAddress addr = AddressUtil.parseAddress(tabletServer);
       final String addrString = org.apache.accumulo.core.util.AddressUtil.toString(addr);
       final TServerInstance doomed = tserverSet.find(addrString);
       if (!force) {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
Thu May 16 16:42:29 2013
@@ -82,7 +82,7 @@ public class TabletStateChangeIterator e
         String instance = parts[1];
         if (instance != null && instance.endsWith("]"))
           instance = instance.substring(0, instance.length() - 1);
-        result.add(new TServerInstance(AddressUtil.parseAddress(hostport, Property.TSERV_CLIENTPORT),
instance));
+        result.add(new TServerInstance(AddressUtil.parseAddress(hostport), instance));
       }
     }
     return result;

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/MasterServlet.java
Thu May 16 16:42:29 2013
@@ -44,6 +44,7 @@ import org.apache.accumulo.server.monito
 import org.apache.accumulo.server.monitor.util.celltypes.DurationType;
 import org.apache.accumulo.server.monitor.util.celltypes.NumberType;
 import org.apache.accumulo.server.monitor.util.celltypes.ProgressChartType;
+import org.apache.accumulo.server.monitor.util.celltypes.StringType;
 import org.apache.accumulo.server.util.AddressUtil;
 import org.apache.log4j.Level;
 
@@ -51,12 +52,10 @@ public class MasterServlet extends Basic
   
   private static final long serialVersionUID = 1L;
   
-  // private TableManager tableManager = TableManager.getInstance();
-  
   @Override
   protected String getTitle(HttpServletRequest req) {
     List<String> masters = Monitor.getInstance().getMasterLocations();
-    return "Master Server" + (masters.size() == 0 ? "" : ":" + AddressUtil.parseAddress(masters.get(0),
Property.MASTER_CLIENTPORT).getHostName());
+    return "Master Server" + (masters.size() == 0 ? "" : ":" + AddressUtil.parseAddress(masters.get(0)).getHostName());
   }
   
   @Override
@@ -132,7 +131,7 @@ public class MasterServlet extends Basic
       List<String> masters = Monitor.getInstance().getMasterLocations();
       
       Table masterStatus = new Table("masterStatus", "Master&nbsp;Status");
-      masterStatus.addSortableColumn("Master");
+      masterStatus.addSortableColumn("Master", new StringType<String>(), "The hostname
of the master server");
       masterStatus.addSortableColumn("#&nbsp;Online<br />Tablet&nbsp;Servers",
new PreciseNumberType((int) (slaves.size() * 0.8 + 1.0), slaves.size(),
           (int) (slaves.size() * 0.6 + 1.0), slaves.size()), "Number of tablet servers currently
available");
       masterStatus.addSortableColumn("#&nbsp;Total<br />Tablet&nbsp;Servers",
new PreciseNumberType(), "The total number of tablet servers configured");
@@ -150,7 +149,7 @@ public class MasterServlet extends Basic
       masterStatus.addSortableColumn("OS&nbsp;Load", new NumberType<Double>(0.,
guessHighLoad * 1., 0., guessHighLoad * 3.),
           "The one-minute load average on the computer that runs the monitor web server.");
       TableRow row = masterStatus.prepareRow();
-      row.add(masters.size() == 0 ? "<div class='error'>Down</div>" : AddressUtil.parseAddress(masters.get(0),
Property.MASTER_CLIENTPORT).getHostName());
+      row.add(masters.size() == 0 ? "<div class='error'>Down</div>" : AddressUtil.parseAddress(masters.get(0)).getHostName());
       row.add(Monitor.getMmi().tServerInfo.size());
       row.add(slaves.size());
       row.add("<a href='/gc'>" + gcStatus + "</a>");
@@ -167,7 +166,6 @@ public class MasterServlet extends Basic
       
     } else
       banner(sb, "error", "Master Server Not Running");
-    
   }
   
   private void doRecoveryList(HttpServletRequest req, StringBuilder sb) {
@@ -184,7 +182,7 @@ public class MasterServlet extends Basic
         if (server.logSorts != null) {
           for (RecoveryStatus recovery : server.logSorts) {
             TableRow row = recoveryTable.prepareRow();
-            row.add(AddressUtil.parseAddress(server.name, Property.TSERV_CLIENTPORT).getHostName());
+            row.add(AddressUtil.parseAddress(server.name).getHostName());
             row.add(recovery.name);
             row.add((long) recovery.runtime);
             row.add(recovery.progress);

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
Thu May 16 16:42:29 2013
@@ -2639,7 +2639,7 @@ public class TabletServer extends Abstra
       if (address == null) {
         return null;
       }
-      MasterClientService.Client client = ThriftUtil.getClient(new MasterClientService.Client.Factory(),
address, Property.MASTER_CLIENTPORT,
+      MasterClientService.Client client = ThriftUtil.getClient(new MasterClientService.Client.Factory(),
address,
           Property.GENERAL_RPC_TIMEOUT, getSystemConfiguration());
       // log.info("Listener API to master has been opened");
       return client;

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddressUtil.java Thu
May 16 16:42:29 2013
@@ -26,5 +26,8 @@ public class AddressUtil {
     final int dfaultPort = ServerConfiguration.getDefaultConfiguration().getPort(portDefaultProperty);
     return org.apache.accumulo.core.util.AddressUtil.parseAddress(address, dfaultPort);
   }
-  
+
+  static public InetSocketAddress parseAddress(String address) {
+    return org.apache.accumulo.core.util.AddressUtil.parseAddress(address);
+  }
 }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Admin.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Admin.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Admin.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Admin.java Thu May
16 16:42:29 2013
@@ -181,7 +181,7 @@ public class Admin {
   
   private static void stopTabletServer(Instance instance, final TCredentials creds, List<String>
servers, final boolean force) throws AccumuloException, AccumuloSecurityException {
     for (String server : servers) {
-      InetSocketAddress address = AddressUtil.parseAddress(server, Property.TSERV_CLIENTPORT);
+      InetSocketAddress address = AddressUtil.parseAddress(server);
       final String finalServer = org.apache.accumulo.core.util.AddressUtil.toString(address);
       log.info("Stopping server " + finalServer);
       MasterClient.execute(HdfsZooInstance.getInstance(), new ClientExec<MasterClientService.Client>()
{

Modified: accumulo/trunk/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java?rev=1483437&r1=1483436&r2=1483437&view=diff
==============================================================================
--- accumulo/trunk/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java
(original)
+++ accumulo/trunk/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java
Thu May 16 16:42:29 2013
@@ -49,7 +49,7 @@ public class StopTabletServer extends Te
           Stat stat = new Stat();
           byte[] data = rdr.getData(base + "/" + child + "/" + children.get(0), stat);
           if (!"master".equals(new String(data))) {
-            result.add(new TServerInstance(AddressUtil.parseAddress(child, Property.TSERV_CLIENTPORT),
stat.getEphemeralOwner()));
+            result.add(new TServerInstance(AddressUtil.parseAddress(child), stat.getEphemeralOwner()));
           }
         }
       } catch (KeeperException.NoNodeException ex) {



Mime
View raw message