incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/6] git commit: Adding more tracer logic.
Date Tue, 17 Dec 2013 13:57:08 GMT
Updated Branches:
  refs/heads/master 356f3f4e2 -> 70597f44b


Adding more tracer logic.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/09c0bda3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/09c0bda3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/09c0bda3

Branch: refs/heads/master
Commit: 09c0bda31b3c20c7ad7a01f85b5b1fada200b0fd
Parents: 356f3f4
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Dec 16 20:37:14 2013 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Dec 17 08:52:09 2013 -0500

----------------------------------------------------------------------
 .../blur/thrift/BlurControllerServer.java       |  20 ++-
 .../apache/blur/thrift/BlurClientManager.java   |  27 ++--
 .../java/org/apache/blur/thrift/ClientPool.java | 143 ++++++++++---------
 3 files changed, 111 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/09c0bda3/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index b57d548..c78b6a9 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -338,6 +338,7 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
   @Override
   public BlurResults query(final String table, final BlurQuery blurQuery) throws BlurException,
TException {
     checkTable(table);
+    Tracer trace = Trace.trace("query - setup", Trace.param("table", table), Trace.param("blurQuery",
blurQuery));
     String cluster = _clusterStatus.getCluster(true, table);
     _queryChecker.checkQuery(blurQuery);
     checkSelectorFetchSize(blurQuery.getSelector());
@@ -346,11 +347,14 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
     if (blurQuery.getUuid() == null) {
       blurQuery.setUuid(UUID.randomUUID().toString());
     }
+    BlurUtil.setStartTime(blurQuery);
+    trace.done();
 
     BlurUtil.setStartTime(blurQuery);
 
     OUTER: for (int retries = 0; retries < _maxDefaultRetries; retries++) {
       try {
+        Tracer selectorTrace = Trace.trace("selector - setup", Trace.param("retries", retries));
         final AtomicLongArray facetCounts = BlurUtil.getAtomicLongArraySameLengthAsList(blurQuery.facets);
         Selector selector = blurQuery.getSelector();
         if (selector == null) {
@@ -367,6 +371,7 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
           }
         }
         blurQuery.setSelector(null);
+        selectorTrace.done();
 
         BlurCommand<BlurResultIterable> command = new BlurCommand<BlurResultIterable>()
{
           @Override
@@ -385,8 +390,19 @@ public class BlurControllerServer extends TableAdmin implements Iface
{
         MergerBlurResultIterable merger = new MergerBlurResultIterable(blurQuery);
         BlurResultIterable hitsIterable = null;
         try {
-          hitsIterable = scatterGather(getCluster(table), command, merger);
-          BlurResults results = convertToBlurResults(hitsIterable, blurQuery, facetCounts,
_executor, selector, table);
+          Tracer scatterGatherTrace = Trace.trace("query - satterGather", Trace.param("retries",
retries));
+          try {
+            hitsIterable = scatterGather(getCluster(table), command, merger);
+          } finally {
+            scatterGatherTrace.done();
+          }
+          BlurResults results;
+          Tracer convertToBlurResults = Trace.trace("query - convertToBlurResults", Trace.param("retries",
retries));
+          try {
+            results = convertToBlurResults(hitsIterable, blurQuery, facetCounts, _executor,
selector, table);
+          } finally {
+            convertToBlurResults.done();
+          }
           if (!validResults(results, shardCount, blurQuery)) {
             BlurClientManager.sleep(_defaultDelay, _maxDefaultDelay, retries, _maxDefaultRetries);
             Map<String, String> map = _shardServerLayout.get().get(table);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/09c0bda3/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
index 4556414..851361c 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/BlurClientManager.java
@@ -143,6 +143,7 @@ public class BlurClientManager {
   @SuppressWarnings("unchecked")
   public static <CLIENT, T> T execute(List<Connection> connections, AbstractCommand<CLIENT,
T> command, int maxRetries,
       long backOffTime, long maxBackOffTime) throws BlurException, TException, IOException
{
+    Tracer traceSetup = Trace.trace("execute - setup");
     LocalResources localResources = new LocalResources();
     AtomicReference<Client> client = localResources.client;
     Random random = localResources.random;
@@ -155,21 +156,27 @@ public class BlurClientManager {
     Collections.shuffle(shuffledConnections, random);
     boolean allBad = true;
     int connectionErrorCount = 0;
+    traceSetup.done();
     while (true) {
       for (Connection connection : shuffledConnections) {
-        if (isBadConnection(connection)) {
-          continue;
-        }
-        client.set(null);
+        Tracer traceConnectionSetup = Trace.trace("execute - connection setup");
         try {
-          client.set(_clientPool.getClient(connection));
-        } catch (IOException e) {
-          if (handleError(connection, client, retries, command, e, maxRetries, backOffTime,
maxBackOffTime)) {
-            throw e;
-          } else {
-            markBadConnection(connection);
+          if (isBadConnection(connection)) {
             continue;
           }
+          client.set(null);
+          try {
+            client.set(_clientPool.getClient(connection));
+          } catch (IOException e) {
+            if (handleError(connection, client, retries, command, e, maxRetries, backOffTime,
maxBackOffTime)) {
+              throw e;
+            } else {
+              markBadConnection(connection);
+              continue;
+            }
+          }
+        } finally {
+          traceConnectionSetup.done();
         }
         Tracer trace = null;
         try {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/09c0bda3/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
----------------------------------------------------------------------
diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java b/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
index ee9e3db..d1f959a 100644
--- a/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
+++ b/blur-thrift/src/main/java/org/apache/blur/thrift/ClientPool.java
@@ -48,7 +48,6 @@ import org.apache.blur.thrift.generated.Blur.Client;
 
 public class ClientPool {
 
-  
   private static final Log LOG = LogFactory.getLog(ClientPool.class);
   private static final Map<Connection, BlockingQueue<Client>> _connMap = new
ConcurrentHashMap<Connection, BlockingQueue<Client>>();
   private int _maxConnectionsPerHost = Integer.MAX_VALUE;
@@ -62,95 +61,106 @@ public class ClientPool {
     try {
       BlurConfiguration config = new BlurConfiguration();
       _idleTimeBeforeClosingClient = config.getLong(BLUR_CLIENTPOOL_CLIENT_CLOSE_THRESHOLD,
-                      TimeUnit.SECONDS.toMillis(30));
-      _clientPoolCleanFrequency = config.getLong(BLUR_CLIENTPOOL_CLIENT_CLEAN_FREQUENCY,
-              TimeUnit.SECONDS.toMillis(300)); 
+          TimeUnit.SECONDS.toMillis(30));
+      _clientPoolCleanFrequency = config
+          .getLong(BLUR_CLIENTPOOL_CLIENT_CLEAN_FREQUENCY, TimeUnit.SECONDS.toMillis(300));
       _maxFrameSize = config.getInt(BLUR_THRIFT_MAX_FRAME_SIZE, 16384000);
     } catch (Exception e) {
-        throw new RuntimeException(e);
+      throw new RuntimeException(e);
     }
     checkAndRemoveStaleClients();
   }
-  
+
   private static void checkAndRemoveStaleClients() {
     _master = new Thread(new Runnable() {
-    @Override
-    public void run() {
-      while (_running.get()) {
-        try {
-          Thread.sleep(getClientPoolCleanFrequency());
-          List<Thread> workers = new ArrayList<Thread>();
-          int num = 0;
-          for (Connection connection : _connMap.keySet()) {
-            Thread thread = new poolWorker(connection);
-            thread.setName("client-cleaner_" + ++num);
-            thread.start();
-            workers.add(thread);
-          }
-          for(Thread t : workers) {
-            t.join();
-          }
-        } catch (InterruptedException e) {
+      @Override
+      public void run() {
+        while (_running.get()) {
+          try {
+            Thread.sleep(getClientPoolCleanFrequency());
+            List<Thread> workers = new ArrayList<Thread>();
+            int num = 0;
+            for (Connection connection : _connMap.keySet()) {
+              Thread thread = new PoolWorker(connection);
+              thread.setName("client-cleaner_" + ++num);
+              thread.start();
+              workers.add(thread);
+            }
+            for (Thread t : workers) {
+              t.join();
+            }
+          } catch (InterruptedException e) {
             throw new RuntimeException(e);
+          }
         }
       }
-    }
     });
     _master.setDaemon(true);
     _master.setName("Blur-Client-Connection-Cleaner");
     _master.start();
   }
-  
-  private static class poolWorker extends Thread {
-	private final Connection connection;
-	public poolWorker(Connection conn) {
-	  this.connection = conn;
-	}
-	@Override
-	public void run() {
-	  BlockingQueue<Client> bq = _connMap.get(connection);
-	  synchronized(connection) {
-	    if (!_connMap.get(connection).isEmpty()) {
-	      Iterator<Client> it = bq.iterator();
-	      try {
-	        while (it.hasNext()) {
-		      Client client = it.next();
-		      if (((WeightedClient)client).isStale()) {
-		    	close(client);
-		        bq.take();
-		      } else break;
-		    }
-	      } catch (InterruptedException e) {
-	          throw new RuntimeException(e);
+
+  private static class PoolWorker extends Thread {
+    private final Connection _connection;
+
+    public PoolWorker(Connection conn) {
+      _connection = conn;
+    }
+
+    @Override
+    public void run() {
+      BlockingQueue<Client> bq = _connMap.get(_connection);
+      synchronized (_connection) {
+        if (!_connMap.get(_connection).isEmpty()) {
+          Iterator<Client> it = bq.iterator();
+          try {
+            while (it.hasNext()) {
+              Client client = it.next();
+              if (((WeightedClient) client).isStale()) {
+                close(client);
+                bq.take();
+              } else
+                break;
+            }
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
           }
-	    }
-	  }
-	}
+        }
+      }
+    }
   }
 
   private class WeightedClient extends Client {
-	private long _enqueueTime;
-	public WeightedClient(TProtocol prot) {
-	  super(prot);
-	}
-	public void setEnqueTime(long _currentTime) {
-	  this._enqueueTime = _currentTime;
-	}
-	public boolean isStale() {
-	  long diff = System.currentTimeMillis() - _enqueueTime;
-   	  return diff >= getClientIdleTimeThreshold();
-	}
+    private long _enqueueTime;
+
+    public WeightedClient(TProtocol prot) {
+      super(prot);
+    }
+
+    public void setEnqueTime(long _currentTime) {
+      _enqueueTime = _currentTime;
+    }
+
+    public boolean isStale() {
+      long diff = System.currentTimeMillis() - _enqueueTime;
+      return diff >= getClientIdleTimeThreshold();
+    }
+  }
+
+  private static long getClientIdleTimeThreshold() {
+    return _idleTimeBeforeClosingClient;
   }
 
-  private static long getClientIdleTimeThreshold() { return _idleTimeBeforeClosingClient;
}
-  private static long getClientPoolCleanFrequency() { return _clientPoolCleanFrequency; }
+  private static long getClientPoolCleanFrequency() {
+    return _clientPoolCleanFrequency;
+  }
 
   public void returnClient(Connection connection, Client client) {
     try {
-      ((WeightedClient) client).setEnqueTime(System.currentTimeMillis());	
+      ((WeightedClient) client).setEnqueTime(System.currentTimeMillis());
       getQueue(connection).put(client);
     } catch (InterruptedException e) {
-        throw new RuntimeException(e);
+      throw new RuntimeException(e);
     }
   }
 
@@ -173,10 +183,9 @@ public class ClientPool {
       try {
         blockingQueue.put(client);
       } catch (InterruptedException e) {
-          throw new RuntimeException(e);
+        throw new RuntimeException(e);
       }
     }
-
     LOG.info("Trashing client for connections [{0}]", connection);
     for (Client c : blockingQueue) {
       close(c);
@@ -216,7 +225,7 @@ public class ClientPool {
     socket.connect(new InetSocketAddress(host, port), timeout);
     trans = new TSocket(socket);
 
-    TProtocol proto = new TBinaryProtocol(new TFramedTransport(trans,_maxFrameSize));
+    TProtocol proto = new TBinaryProtocol(new TFramedTransport(trans, _maxFrameSize));
     return new WeightedClient(proto);
   }
 


Mime
View raw message