incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] git commit: Extracted the client pool logic into it's own class.
Date Fri, 02 Aug 2013 14:18:14 GMT
Updated Branches:
  refs/heads/master a93ab7a68 -> 7970c9068


Extracted the client pool logic into it's own class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/14fba2ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/14fba2ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/14fba2ed

Branch: refs/heads/master
Commit: 14fba2ed307c94a02277bac28690d14666aae27b
Parents: a93ab7a
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Aug 2 09:49:47 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri Aug 2 09:49:47 2013 -0400

----------------------------------------------------------------------
 .../apache/blur/thrift/BlurClientManager.java   | 115 +++++------------
 .../java/org/apache/blur/thrift/ClientPool.java | 125 +++++++++++++++++++
 2 files changed, 155 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/14fba2ed/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
index 46367c1..1c41d48 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
@@ -17,10 +17,6 @@ package org.apache.blur.thrift;
  * limitations under the License.
  */
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Proxy;
-import java.net.Proxy.Type;
-import java.net.Socket;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -29,9 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -40,14 +34,10 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.thirdparty.thrift_0_9_0.TException;
-import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TBinaryProtocol;
-import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol;
-import org.apache.blur.thirdparty.thrift_0_9_0.transport.TFramedTransport;
-import org.apache.blur.thirdparty.thrift_0_9_0.transport.TSocket;
 import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
 import org.apache.blur.thrift.generated.Blur;
-import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.Blur.Client;
+import org.apache.blur.thrift.generated.BlurException;
 
 public class BlurClientManager {
 
@@ -59,31 +49,39 @@ public class BlurClientManager {
   public static final long MAX_BACK_OFF_TIME = TimeUnit.SECONDS.toMillis(10);
   private static final long ONE_SECOND = TimeUnit.SECONDS.toMillis(1);
 
-  private static Map<Connection, BlockingQueue<Client>> clientPool = new ConcurrentHashMap<Connection,
BlockingQueue<Client>>();
-  private static Thread daemon;
-  private static AtomicBoolean running = new AtomicBoolean(true);
-  private static Map<Connection, Object> badConnections = new ConcurrentHashMap<Connection,
Object>();
+  private static ClientPool _clientPool = new ClientPool();
+  private static Thread _daemon;
+  private static AtomicBoolean _running = new AtomicBoolean(true);
+  private static Map<Connection, Object> _badConnections = new ConcurrentHashMap<Connection,
Object>();
 
   static {
     startDaemon();
   }
 
+  public static void setClientPool(ClientPool clientPool) {
+    _clientPool = clientPool;
+  }
+
+  public static ClientPool getClientPool() {
+    return _clientPool;
+  }
+
   private static void startDaemon() {
-    daemon = new Thread(new Runnable() {
+    _daemon = new Thread(new Runnable() {
       private Set<Connection> good = new HashSet<Connection>();
 
       @Override
       public void run() {
-        while (running.get()) {
+        while (_running.get()) {
           good.clear();
-          Set<Connection> badConns = badConnections.keySet();
+          Set<Connection> badConns = _badConnections.keySet();
           for (Connection connection : badConns) {
             if (isConnectionGood(connection)) {
               good.add(connection);
             }
           }
           for (Connection connection : good) {
-            badConnections.remove(connection);
+            _badConnections.remove(connection);
           }
           try {
             Thread.sleep(ONE_SECOND);
@@ -93,14 +91,14 @@ public class BlurClientManager {
         }
       }
     });
-    daemon.setDaemon(true);
-    daemon.setName("Blur-Client-Manager-Connection-Checker");
-    daemon.start();
+    _daemon.setDaemon(true);
+    _daemon.setName("Blur-Client-Manager-Connection-Checker");
+    _daemon.start();
   }
 
   protected static boolean isConnectionGood(Connection connection) {
     try {
-      returnClient(connection, getClient(connection));
+      returnClient(connection, _clientPool.getClient(connection));
       return true;
     } catch (TTransportException e) {
       LOG.debug("Connection [{0}] is still bad.", connection);
@@ -154,7 +152,7 @@ public class BlurClientManager {
         }
         client.set(null);
         try {
-          client.set(getClient(connection));
+          client.set(_clientPool.getClient(connection));
         } catch (IOException e) {
           if (handleError(connection, client, retries, command, e, maxRetries, backOffTime,
maxBackOffTime)) {
             throw e;
@@ -209,11 +207,11 @@ public class BlurClientManager {
 
   private static void markBadConnection(Connection connection) {
     LOG.info("Marking bad connection [{0}]", connection);
-    badConnections.put(connection, NULL);
+    _badConnections.put(connection, NULL);
   }
 
   private static boolean isBadConnection(Connection connection) {
-    return badConnections.containsKey(connection);
+    return _badConnections.containsKey(connection);
   }
 
   private static <CLIENT, T> boolean handleError(Connection connection, AtomicReference<Blur.Client>
client,
@@ -279,73 +277,20 @@ public class BlurClientManager {
     returnClient(connection, client.get());
   }
 
-  public static void returnClient(Connection connection, Blur.Client client) {
-    try {
-      clientPool.get(connection).put(client);
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
   private static void trashConnections(Connection connection, AtomicReference<Client>
c) {
-    BlockingQueue<Client> blockingQueue;
-    synchronized (clientPool) {
-      blockingQueue = clientPool.put(connection, new LinkedBlockingQueue<Client>());
-      try {
-        blockingQueue.put(c.get());
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    LOG.info("Trashing client for connections [{0}]", connection);
-    for (Client client : blockingQueue) {
-      close(client);
-    }
+    _clientPool.trashConnections(connection, c.get());
   }
 
-  public static void close(Client client) {
-    client.getInputProtocol().getTransport().close();
-    client.getOutputProtocol().getTransport().close();
+  public static void returnClient(Connection connection, Blur.Client client) {
+    _clientPool.returnClient(connection, client);
   }
 
-  private static Client getClient(Connection connection) throws TTransportException, IOException
{
-    BlockingQueue<Client> blockingQueue;
-    synchronized (clientPool) {
-      blockingQueue = clientPool.get(connection);
-      if (blockingQueue == null) {
-        blockingQueue = new LinkedBlockingQueue<Client>();
-        clientPool.put(connection, blockingQueue);
-      }
-    }
-    if (blockingQueue.isEmpty()) {
-      return newClient(connection);
-    }
-    try {
-      return blockingQueue.take();
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
+  public static void close(Client client) {
+    _clientPool.close(client);
   }
 
   public static Client newClient(Connection connection) throws TTransportException, IOException
{
-    String host = connection.getHost();
-    int port = connection.getPort();
-    TSocket trans;
-    Socket socket;
-    if (connection.isProxy()) {
-      Proxy proxy = new Proxy(Type.SOCKS, new InetSocketAddress(connection.getProxyHost(),
connection.getProxyPort()));
-      socket = new Socket(proxy);
-    } else {
-      socket = new Socket();
-    }
-    socket.setTcpNoDelay(true);
-    socket.connect(new InetSocketAddress(host, port));
-    trans = new TSocket(socket);
-
-    TProtocol proto = new TBinaryProtocol(new TFramedTransport(trans));
-    Client client = new Client(proto);
-    return client;
+    return _clientPool.newClient(connection);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/14fba2ed/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java b/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
new file mode 100644
index 0000000..5e7136c
--- /dev/null
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
@@ -0,0 +1,125 @@
+package org.apache.blur.thrift;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.net.Proxy.Type;
+import java.net.Socket;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TBinaryProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TFramedTransport;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TSocket;
+import org.apache.blur.thirdparty.thrift_0_9_0.transport.TTransportException;
+import org.apache.blur.thrift.generated.Blur.Client;
+
+public class ClientPool {
+
+  private static final Log LOG = LogFactory.getLog(ClientPool.class);
+  private final Map<Connection, BlockingQueue<Client>> _clientPool = new ConcurrentHashMap<Connection,
BlockingQueue<Client>>();
+  private int _maxConnectionsPerHost = Integer.MAX_VALUE;
+
+  // private long _idleTimeBeforeClosingClient = Long.MAX_VALUE;
+
+  public void returnClient(Connection connection, Client client) {
+    try {
+      getQueue(connection).put(client);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private BlockingQueue<Client> getQueue(Connection connection) {
+    BlockingQueue<Client> blockingQueue = _clientPool.get(connection);
+    synchronized (_clientPool) {
+      blockingQueue = _clientPool.get(connection);
+      if (blockingQueue == null) {
+        blockingQueue = getNewQueue();
+        _clientPool.put(connection, blockingQueue);
+      }
+    }
+    return _clientPool.get(connection);
+  }
+
+  public void trashConnections(Connection connection, Client client) {
+    BlockingQueue<Client> blockingQueue;
+    synchronized (_clientPool) {
+      blockingQueue = _clientPool.put(connection, getNewQueue());
+      try {
+        blockingQueue.put(client);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    LOG.info("Trashing client for connections [{0}]", connection);
+    for (Client c : blockingQueue) {
+      close(c);
+    }
+  }
+
+  private BlockingQueue<Client> getNewQueue() {
+    return new LinkedBlockingQueue<Client>(_maxConnectionsPerHost);
+  }
+
+  public Client getClient(Connection connection) throws TTransportException, IOException
{
+    BlockingQueue<Client> blockingQueue = getQueue(connection);
+    if (blockingQueue.isEmpty()) {
+      return newClient(connection);
+    }
+    try {
+      return blockingQueue.take();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public Client newClient(Connection connection) throws TTransportException, IOException
{
+    String host = connection.getHost();
+    int port = connection.getPort();
+    TSocket trans;
+    Socket socket;
+    if (connection.isProxy()) {
+      Proxy proxy = new Proxy(Type.SOCKS, new InetSocketAddress(connection.getProxyHost(),
connection.getProxyPort()));
+      socket = new Socket(proxy);
+    } else {
+      socket = new Socket();
+    }
+    socket.setTcpNoDelay(true);
+    socket.connect(new InetSocketAddress(host, port));
+    trans = new TSocket(socket);
+
+    TProtocol proto = new TBinaryProtocol(new TFramedTransport(trans));
+    Client client = new Client(proto);
+    return client;
+  }
+
+  public void close(Client client) {
+    client.getInputProtocol().getTransport().close();
+    client.getOutputProtocol().getTransport().close();
+  }
+
+}


Mime
View raw message