incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [5/5] git commit: Fixing a number of thread saftey and logic issues in the client pool.
Date Mon, 13 Jan 2014 18:53:22 GMT
Fixing a number of thread saftey and logic issues in the client pool.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/2498bcb9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/2498bcb9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/2498bcb9

Branch: refs/heads/apache-blur-0.2
Commit: 2498bcb9154eab4bb044cd2f19c908e272e9071d
Parents: c7905dd
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Jan 13 13:44:51 2014 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Jan 13 13:44:51 2014 -0500

----------------------------------------------------------------------
 .../java/org/apache/blur/thrift/ClientPool.java | 144 ++++++++++---------
 1 file changed, 79 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2498bcb9/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java b/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
index 9407ee5..667817a 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
@@ -26,10 +26,8 @@ import java.net.InetSocketAddress;
 import java.net.Proxy;
 import java.net.Proxy.Type;
 import java.net.Socket;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -39,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.thirdparty.thrift_0_9_0.TException;
 import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TBinaryProtocol;
 import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol;
 import org.apache.blur.thirdparty.thrift_0_9_0.transport.TFramedTransport;
@@ -62,9 +61,8 @@ public class ClientPool {
     try {
       BlurConfiguration config = new BlurConfiguration();
       _idleTimeBeforeClosingClient = config.getLong(BLUR_CLIENTPOOL_CLIENT_CLOSE_THRESHOLD,
-          TimeUnit.SECONDS.toMillis(30));
-      _clientPoolCleanFrequency = config
-          .getLong(BLUR_CLIENTPOOL_CLIENT_CLEAN_FREQUENCY, TimeUnit.SECONDS.toMillis(300));
+          TimeUnit.SECONDS.toNanos(30));
+      _clientPoolCleanFrequency = config.getLong(BLUR_CLIENTPOOL_CLIENT_CLEAN_FREQUENCY,
TimeUnit.SECONDS.toMillis(3));
       _maxFrameSize = config.getInt(BLUR_THRIFT_MAX_FRAME_SIZE, 16384000);
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -77,73 +75,62 @@ public class ClientPool {
       @Override
       public void run() {
         while (_running.get()) {
+          for (Entry<Connection, BlockingQueue<Client>> e : _connMap.entrySet())
{
+            testConnections(e.getKey(), e.getValue());
+          }
           try {
             Thread.sleep(getClientPoolCleanFrequency());
-            List<Thread> workers = new ArrayList<Thread>();
-            int num = 0;
-            for (Connection connection : _connMap.keySet()) {
-              Thread thread = new PoolWorker(connection);
-              thread.setName("client-cleaner_" + ++num);
-              thread.start();
-              workers.add(thread);
-            }
-            for (Thread t : workers) {
-              t.join();
-            }
           } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+            return;
           }
         }
       }
-    });
-    _master.setDaemon(true);
-    _master.setName("Blur-Client-Connection-Cleaner");
-    _master.start();
-  }
 
-  private static class PoolWorker extends Thread {
-    private final Connection _connection;
-
-    public PoolWorker(Connection conn) {
-      _connection = conn;
-    }
-
-    @Override
-    public void run() {
-      BlockingQueue<Client> bq = _connMap.get(_connection);
-      synchronized (_connection) {
-        if (!_connMap.get(_connection).isEmpty()) {
-          Iterator<Client> it = bq.iterator();
-          try {
-            while (it.hasNext()) {
-              Client client = it.next();
-              if (((WeightedClient) client).isStale()) {
-                close(client);
-                bq.take();
-              } else
-                break;
+      private void testConnections(Connection connection, BlockingQueue<Client> clients)
{
+        LOG.debug("Testing clients for connection [{0}]", connection);
+        int size = clients.size();
+        for (int i = 0; i < size; i++) {
+          WeightedClient weightedClient = (WeightedClient) clients.poll();
+          if (weightedClient == null) {
+            return;
+          }
+          if (weightedClient.isStale()) {
+            if (testClient(connection, weightedClient)) {
+              tryToReturnToQueue(clients, weightedClient);
+            } else {
+              close(weightedClient);
             }
-          } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+          } else {
+            tryToReturnToQueue(clients, weightedClient);
           }
         }
       }
-    }
+
+      private void tryToReturnToQueue(BlockingQueue<Client> clients, WeightedClient
weightedClient) {
+        if (!clients.offer(weightedClient)) {
+          // Close client
+          close(weightedClient);
+        }
+      }
+    });
+    _master.setDaemon(true);
+    _master.setName("Blur-Client-Connection-Cleaner");
+    _master.start();
   }
 
   private class WeightedClient extends SafeClientGen {
-    private long _enqueueTime;
+    private long _lastUse = System.nanoTime();
 
     public WeightedClient(TProtocol prot) {
       super(prot);
     }
 
-    public void setEnqueTime(long _currentTime) {
-      _enqueueTime = _currentTime;
+    public void touch() {
+      _lastUse = System.nanoTime();
     }
 
     public boolean isStale() {
-      long diff = System.currentTimeMillis() - _enqueueTime;
+      long diff = System.nanoTime() - _lastUse;
       return diff >= getClientIdleTimeThreshold();
     }
   }
@@ -157,17 +144,19 @@ public class ClientPool {
   }
 
   public void returnClient(Connection connection, Client client) {
-    try {
-      ((WeightedClient) client).setEnqueTime(System.currentTimeMillis());
-      getQueue(connection).put(client);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
+    ((WeightedClient) client).touch();
+    if (!getQueue(connection).offer(client)) {
+      // Close client
+      close(client);
     }
   }
 
   private BlockingQueue<Client> getQueue(Connection connection) {
-    BlockingQueue<Client> blockingQueue;
-    synchronized (connection) {
+    BlockingQueue<Client> blockingQueue = _connMap.get(connection);
+    if (blockingQueue != null) {
+      return blockingQueue;
+    }
+    synchronized (_connMap) {
       blockingQueue = _connMap.get(connection);
       if (blockingQueue == null) {
         blockingQueue = getNewQueue();
@@ -179,7 +168,7 @@ public class ClientPool {
 
   public void trashConnections(Connection connection, Client client) {
     BlockingQueue<Client> blockingQueue;
-    synchronized (connection) {
+    synchronized (_connMap) {
       blockingQueue = _connMap.put(connection, getNewQueue());
       try {
         blockingQueue.put(client);
@@ -202,10 +191,19 @@ public class ClientPool {
     if (blockingQueue.isEmpty()) {
       return newClient(connection);
     }
-    try {
-      return blockingQueue.take();
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
+    while (true) {
+      WeightedClient client = (WeightedClient) blockingQueue.poll();
+      if (client == null) {
+        return newClient(connection);
+      }
+      if (client.isStale()) {
+        // Test client
+        if (testClient(connection, client)) {
+          return client;
+        }
+      } else {
+        return client;
+      }
     }
   }
 
@@ -230,8 +228,24 @@ public class ClientPool {
     return new WeightedClient(proto);
   }
 
+  private static boolean testClient(Connection connection, WeightedClient weightedClient)
{
+    LOG.debug("Testing client, could be stale. Client [{0}] for connection [{1}]", weightedClient,
connection);
+    try {
+      weightedClient.ping();
+      weightedClient.touch();
+      return true;
+    } catch (TException e) {
+      LOG.error("Client test failed. Destroying client. Client [{0}] for connection [{1}]",
weightedClient, connection);
+      return false;
+    }
+  }
+
   public static void close(Client client) {
-    client.getInputProtocol().getTransport().close();
-    client.getOutputProtocol().getTransport().close();
+    try {
+      client.getInputProtocol().getTransport().close();
+      client.getOutputProtocol().getTransport().close();
+    } catch (Exception e) {
+      LOG.error("Error during closing of client [{0}].", client);
+    }
   }
 }


Mime
View raw message