accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [1/3] accumulo git commit: ACCUMULO-3425 Fix style/formatter/whitespace
Date Tue, 16 Dec 2014 23:51:47 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 0354a7c88 -> f3878f5f6


ACCUMULO-3425 Fix style/formatter/whitespace


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dfb66c35
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dfb66c35
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dfb66c35

Branch: refs/heads/master
Commit: dfb66c35358da2bb49704038f4f22ec2eaaaf96e
Parents: 0354a7c
Author: Josh Elser <elserj@apache.org>
Authored: Tue Dec 16 15:46:44 2014 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Tue Dec 16 16:42:31 2014 -0500

----------------------------------------------------------------------
 .../core/client/impl/ThriftTransportPool.java   | 187 +++++++++----------
 .../apache/accumulo/core/rpc/ThriftUtil.java    |   6 +-
 .../accumulo/server/rpc/TServerUtils.java       |   7 +-
 3 files changed, 99 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfb66c35/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index bdb04d9..c159e8b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -42,41 +42,41 @@ import com.google.common.net.HostAndPort;
 
 public class ThriftTransportPool {
   private static SecurityPermission TRANSPORT_POOL_PERMISSION = new SecurityPermission("transportPoolPermission");
-  
+
   private static final Random random = new Random();
   private long killTime = 1000 * 3;
-  
+
   private Map<ThriftTransportKey,List<CachedConnection>> cache = new HashMap<ThriftTransportKey,List<CachedConnection>>();
   private Map<ThriftTransportKey,Long> errorCount = new HashMap<ThriftTransportKey,Long>();
   private Map<ThriftTransportKey,Long> errorTime = new HashMap<ThriftTransportKey,Long>();
   private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<ThriftTransportKey>();
 
   private CountDownLatch closerExitLatch;
-  
+
   private static final Logger log = Logger.getLogger(ThriftTransportPool.class);
-  
+
   private static final Long ERROR_THRESHOLD = 20l;
   private static final int STUCK_THRESHOLD = 2 * 60 * 1000;
-  
+
   private static class CachedConnection {
-    
+
     public CachedConnection(CachedTTransport t) {
       this.transport = t;
     }
-    
+
     void setReserved(boolean reserved) {
       this.transport.setReserved(reserved);
     }
-    
+
     boolean isReserved() {
       return this.transport.reserved;
     }
-    
+
     CachedTTransport transport;
-    
+
     long lastReturnTime;
   }
-  
+
   public static class TransportPoolShutdownException extends RuntimeException {
     private static final long serialVersionUID = 1L;
   }
@@ -84,36 +84,36 @@ public class ThriftTransportPool {
   private static class Closer implements Runnable {
     final ThriftTransportPool pool;
     private CountDownLatch closerExitLatch;
-    
+
     public Closer(ThriftTransportPool pool, CountDownLatch closerExitLatch) {
       this.pool = pool;
       this.closerExitLatch = closerExitLatch;
     }
-    
+
     private void closeConnections() {
       while (true) {
-        
+
         ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>();
-        
+
         synchronized (pool) {
           for (List<CachedConnection> ccl : pool.getCache().values()) {
             Iterator<CachedConnection> iter = ccl.iterator();
             while (iter.hasNext()) {
               CachedConnection cachedConnection = iter.next();
-              
+
               if (!cachedConnection.isReserved() && System.currentTimeMillis() -
cachedConnection.lastReturnTime > pool.killTime) {
                 connectionsToClose.add(cachedConnection);
                 iter.remove();
               }
             }
           }
-          
+
           for (List<CachedConnection> ccl : pool.getCache().values()) {
             for (CachedConnection cachedConnection : ccl) {
               cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD);
             }
           }
-          
+
           Iterator<Entry<ThriftTransportKey,Long>> iter = pool.errorTime.entrySet().iterator();
           while (iter.hasNext()) {
             Entry<ThriftTransportKey,Long> entry = iter.next();
@@ -124,12 +124,12 @@ public class ThriftTransportPool {
             }
           }
         }
-        
+
         // close connections outside of sync block
         for (CachedConnection cachedConnection : connectionsToClose) {
           cachedConnection.transport.close();
         }
-        
+
         try {
           Thread.sleep(500);
         } catch (InterruptedException e) {
@@ -142,32 +142,31 @@ public class ThriftTransportPool {
     public void run() {
       try {
         closeConnections();
-      } catch (TransportPoolShutdownException e) {
-      } finally {
+      } catch (TransportPoolShutdownException e) {} finally {
         closerExitLatch.countDown();
       }
     }
   }
-  
+
   static class CachedTTransport extends TTransport {
-    
+
     private ThriftTransportKey cacheKey;
     private TTransport wrappedTransport;
     private boolean sawError = false;
-    
+
     private volatile String ioThreadName = null;
     private volatile long ioStartTime = 0;
     private volatile boolean reserved = false;
-    
+
     private String stuckThreadName = null;
-    
+
     int ioCount = 0;
     int lastIoCount = -1;
-    
+
     private void sawError(Exception e) {
       sawError = true;
     }
-    
+
     final void setReserved(boolean reserved) {
       this.reserved = reserved;
       if (reserved) {
@@ -181,22 +180,22 @@ public class ThriftTransportPool {
           log.warn("Connection returned to thrift connection pool that may still be in use
" + ioThreadName + " " + Thread.currentThread().getName(),
               new Exception());
         }
-        
+
         ioCount = 0;
         lastIoCount = -1;
         ioThreadName = null;
       }
       checkForStuckIO(STUCK_THRESHOLD);
     }
-    
+
     final void checkForStuckIO(long threshold) {
       /*
        * checking for stuck io needs to be light weight.
-       * 
+       *
        * Tried to call System.currentTimeMillis() and Thread.currentThread() before every
io operation.... this dramatically slowed things down. So switched to
        * incrementing a counter before and after each io operation.
        */
-      
+
       if ((ioCount & 1) == 1) {
         // when ioCount is odd, it means I/O is currently happening
         if (ioCount == lastIoCount) {
@@ -212,7 +211,7 @@ public class ThriftTransportPool {
           // if it changes
           lastIoCount = ioCount;
           ioStartTime = System.currentTimeMillis();
-          
+
           if (stuckThreadName != null) {
             // doing I/O, but ioCount changed so no longer stuck
             log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO  to " + cacheKey
+ " sawError = " + sawError);
@@ -228,17 +227,17 @@ public class ThriftTransportPool {
         }
       }
     }
-    
+
     public CachedTTransport(TTransport transport, ThriftTransportKey cacheKey2) {
       this.wrappedTransport = transport;
       this.cacheKey = cacheKey2;
     }
-    
+
     @Override
     public boolean isOpen() {
       return wrappedTransport.isOpen();
     }
-    
+
     @Override
     public void open() throws TTransportException {
       try {
@@ -251,7 +250,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public int read(byte[] arg0, int arg1, int arg2) throws TTransportException {
       try {
@@ -264,7 +263,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public int readAll(byte[] arg0, int arg1, int arg2) throws TTransportException {
       try {
@@ -277,7 +276,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public void write(byte[] arg0, int arg1, int arg2) throws TTransportException {
       try {
@@ -290,7 +289,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public void write(byte[] arg0) throws TTransportException {
       try {
@@ -303,7 +302,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public void close() {
       try {
@@ -312,9 +311,9 @@ public class ThriftTransportPool {
       } finally {
         ioCount++;
       }
-      
+
     }
-    
+
     @Override
     public void flush() throws TTransportException {
       try {
@@ -327,7 +326,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public boolean peek() {
       try {
@@ -337,7 +336,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public byte[] getBuffer() {
       try {
@@ -347,7 +346,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public int getBufferPosition() {
       try {
@@ -357,7 +356,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public int getBytesRemainingInBuffer() {
       try {
@@ -367,7 +366,7 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     @Override
     public void consumeBuffer(int len) {
       try {
@@ -377,33 +376,33 @@ public class ThriftTransportPool {
         ioCount++;
       }
     }
-    
+
     public ThriftTransportKey getCacheKey() {
       return cacheKey;
     }
-    
+
   }
-  
+
   private ThriftTransportPool() {}
-  
+
   public TTransport getTransportWithDefaultTimeout(HostAndPort addr, ClientContext context)
throws TTransportException {
     return getTransport(String.format("%s:%d", addr.getHostText(), addr.getPort()), context.getClientTimeoutInMillis(),
context);
   }
-  
+
   public TTransport getTransport(String location, long milliseconds, ClientContext context)
throws TTransportException {
     return getTransport(new ThriftTransportKey(location, milliseconds, context));
   }
-  
+
   private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException
{
     synchronized (this) {
       // atomically reserve location if it exist in cache
       List<CachedConnection> ccl = getCache().get(cacheKey);
-      
+
       if (ccl == null) {
         ccl = new LinkedList<CachedConnection>();
         getCache().put(cacheKey, ccl);
       }
-      
+
       for (CachedConnection cachedConnection : ccl) {
         if (!cachedConnection.isReserved()) {
           cachedConnection.setReserved(true);
@@ -413,26 +412,26 @@ public class ThriftTransportPool {
         }
       }
     }
-    
+
     return createNewTransport(cacheKey);
   }
-  
+
   Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean
preferCachedConnection) throws TTransportException {
-    
+
     servers = new ArrayList<ThriftTransportKey>(servers);
-    
+
     if (preferCachedConnection) {
       HashSet<ThriftTransportKey> serversSet = new HashSet<ThriftTransportKey>(servers);
-      
+
       synchronized (this) {
-        
+
         // randomly pick a server from the connection cache
         serversSet.retainAll(getCache().keySet());
-        
+
         if (serversSet.size() > 0) {
           ArrayList<ThriftTransportKey> cachedServers = new ArrayList<ThriftTransportKey>(serversSet);
           Collections.shuffle(cachedServers, random);
-          
+
           for (ThriftTransportKey ttk : cachedServers) {
             for (CachedConnection cachedConnection : getCache().get(ttk)) {
               if (!cachedConnection.isReserved()) {
@@ -446,12 +445,12 @@ public class ThriftTransportPool {
         }
       }
     }
-    
+
     int retryCount = 0;
     while (servers.size() > 0 && retryCount < 10) {
       int index = random.nextInt(servers.size());
       ThriftTransportKey ttk = servers.get(index);
-      
+
       if (!preferCachedConnection) {
         synchronized (this) {
           List<CachedConnection> cachedConnList = getCache().get(ttk);
@@ -467,7 +466,7 @@ public class ThriftTransportPool {
           }
         }
       }
-      
+
       try {
         return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(),
createNewTransport(ttk));
       } catch (TTransportException tte) {
@@ -476,22 +475,22 @@ public class ThriftTransportPool {
         retryCount++;
       }
     }
-    
+
     throw new TTransportException("Failed to connect to a server");
   }
-  
+
   private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException
{
     TTransport transport = ThriftUtil.createClientTransport(HostAndPort.fromParts(cacheKey.getLocation(),
cacheKey.getPort()), (int) cacheKey.getTimeout(),
         cacheKey.getSslParams());
 
     if (log.isTraceEnabled())
       log.trace("Creating new connection to connection to " + cacheKey.getLocation() + ":"
+ cacheKey.getPort());
-    
+
     CachedTTransport tsc = new CachedTTransport(transport, cacheKey);
-    
+
     CachedConnection cc = new CachedConnection(tsc);
     cc.setReserved(true);
-    
+
     try {
       synchronized (this) {
         List<CachedConnection> ccl = getCache().get(cacheKey);
@@ -500,7 +499,7 @@ public class ThriftTransportPool {
           ccl = new LinkedList<CachedConnection>();
           getCache().put(cacheKey, ccl);
         }
-      
+
         ccl.add(cc);
       }
     } catch (TransportPoolShutdownException e) {
@@ -509,17 +508,17 @@ public class ThriftTransportPool {
     }
     return cc.transport;
   }
-  
+
   public void returnTransport(TTransport tsc) {
     if (tsc == null) {
       return;
     }
-    
+
     boolean existInCache = false;
     CachedTTransport ctsc = (CachedTTransport) tsc;
-    
+
     ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>();
-    
+
     synchronized (this) {
       List<CachedConnection> ccl = getCache().get(ctsc.getCacheKey());
       for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();)
{
@@ -528,33 +527,33 @@ public class ThriftTransportPool {
           if (ctsc.sawError) {
             closeList.add(cachedConnection);
             iterator.remove();
-            
+
             if (log.isTraceEnabled())
               log.trace("Returned connection had error " + ctsc.getCacheKey());
-            
+
             Long ecount = errorCount.get(ctsc.getCacheKey());
             if (ecount == null)
               ecount = 0l;
             ecount++;
             errorCount.put(ctsc.getCacheKey(), ecount);
-            
+
             Long etime = errorTime.get(ctsc.getCacheKey());
             if (etime == null) {
               errorTime.put(ctsc.getCacheKey(), System.currentTimeMillis());
             }
-            
+
             if (ecount >= ERROR_THRESHOLD && !serversWarnedAbout.contains(ctsc.getCacheKey()))
{
               log.warn("Server " + ctsc.getCacheKey() + " had " + ecount + " failures in
a short time period, will not complain anymore ");
               serversWarnedAbout.add(ctsc.getCacheKey());
             }
-            
+
             cachedConnection.setReserved(false);
-            
+
           } else {
-            
+
             if (log.isTraceEnabled())
               log.trace("Returned connection " + ctsc.getCacheKey() + " ioCount : " + cachedConnection.transport.ioCount);
-            
+
             cachedConnection.lastReturnTime = System.currentTimeMillis();
             cachedConnection.setReserved(false);
           }
@@ -562,7 +561,7 @@ public class ThriftTransportPool {
           break;
         }
       }
-      
+
       // remove all unreserved cached connection when a sever has an error, not just the
connection that was returned
       if (ctsc.sawError) {
         for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();)
{
@@ -574,7 +573,7 @@ public class ThriftTransportPool {
         }
       }
     }
-    
+
     // close outside of sync block
     for (CachedConnection cachedConnection : closeList) {
       try {
@@ -583,14 +582,14 @@ public class ThriftTransportPool {
         log.debug("Failed to close connection w/ errors", e);
       }
     }
-    
+
     if (!existInCache) {
       log.warn("Returned tablet server connection to cache that did not come from cache");
       // close outside of sync block
       tsc.close();
     }
   }
-  
+
   /**
    * Set the time after which idle connections should be closed
    */
@@ -598,16 +597,16 @@ public class ThriftTransportPool {
     this.killTime = time;
     log.debug("Set thrift transport pool idle time to " + time);
   }
-  
+
   private static ThriftTransportPool instance = new ThriftTransportPool();
   private static final AtomicBoolean daemonStarted = new AtomicBoolean(false);
-  
+
   public static ThriftTransportPool getInstance() {
     SecurityManager sm = System.getSecurityManager();
     if (sm != null) {
       sm.checkPermission(TRANSPORT_POOL_PERMISSION);
     }
-    
+
     if (daemonStarted.compareAndSet(false, true)) {
       CountDownLatch closerExitLatch = new CountDownLatch(1);
       new Daemon(new Closer(instance, closerExitLatch), "Thrift Connection Pool Checker").start();
@@ -615,7 +614,7 @@ public class ThriftTransportPool {
     }
     return instance;
   }
-  
+
   private synchronized void setCloserExitLatch(CountDownLatch closerExitLatch) {
     this.closerExitLatch = closerExitLatch;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfb66c35/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index a3cb252..7f4609b 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@ -105,8 +105,7 @@ public class ThriftUtil {
     return factory.getClient(protocolFactory.getProtocol(transport), protocolFactory.getProtocol(transport));
   }
 
-  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, HostAndPort address, ClientContext context)
-      throws TTransportException {
+  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, HostAndPort address, ClientContext context) throws TTransportException {
     return createClient(factory, ThriftTransportPool.getInstance().getTransportWithDefaultTimeout(address,
context));
   }
 
@@ -115,8 +114,7 @@ public class ThriftUtil {
     return getClient(factory, address, context, 0);
   }
 
-  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, String address, ClientContext context)
-      throws TTransportException {
+  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, String address, ClientContext context) throws TTransportException {
     TTransport transport = ThriftTransportPool.getInstance().getTransport(address, context.getClientTimeoutInMillis(),
context);
     return createClient(factory, transport);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/dfb66c35/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 1bd337c..9a74357 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -193,8 +193,9 @@ public class TServerUtils {
     return new ServerAddress(createThreadPoolServer(transport, processor), address);
   }
 
-  public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address,
TProcessor processor, String serverName, String threadName, int numThreads, int numSTThreads,
-      long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long
sslSocketTimeout) throws TTransportException {
+  public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address,
TProcessor processor, String serverName, String threadName,
+      int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize,
SslConnectionParams sslParams, long sslSocketTimeout)
+      throws TTransportException {
     return startTServer(address, new TimedProcessor(conf, processor, serverName, threadName),
serverName, threadName, numThreads, numSTThreads,
         timeBetweenThreadChecks, maxMessageSize, sslParams, sslSocketTimeout);
   }
@@ -205,7 +206,7 @@ public class TServerUtils {
    * @return A ServerAddress encapsulating the Thrift server created and the host/port which
it is bound to.
    */
   public static ServerAddress startTServer(HostAndPort address, TimedProcessor processor,
String serverName, String threadName, int numThreads,
-    int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams
sslParams, long sslSocketTimeout) throws TTransportException {
+      int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams
sslParams, long sslSocketTimeout) throws TTransportException {
 
     ServerAddress serverAddress;
     if (sslParams != null) {


Mime
View raw message