incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] git commit: Fixing a connection leak in the controller when using commands.
Date Tue, 19 May 2015 00:19:40 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master b1113edf5 -> 8fa8ececa


Fixing a connection leak in the controller when using commands.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/f6abe276
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/f6abe276
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/f6abe276

Branch: refs/heads/master
Commit: f6abe276402b4078dad8ff0cd78a38173c55d50a
Parents: 3915b9f
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon May 18 19:49:42 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon May 18 19:49:42 2015 -0400

----------------------------------------------------------------------
 .../org/apache/blur/command/BaseContext.java    |  3 +-
 .../blur/command/ControllerClusterContext.java  | 31 +++++++++---
 .../blur/command/ControllerCommandManager.java  | 53 +++++++++++---------
 3 files changed, 53 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f6abe276/blur-core/src/main/java/org/apache/blur/command/BaseContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/BaseContext.java b/blur-core/src/main/java/org/apache/blur/command/BaseContext.java
index 527bc29..9ea1945 100644
--- a/blur-core/src/main/java/org/apache/blur/command/BaseContext.java
+++ b/blur-core/src/main/java/org/apache/blur/command/BaseContext.java
@@ -23,10 +23,9 @@ import org.apache.blur.BlurConfiguration;
 import org.apache.blur.server.TableContext;
 
 public abstract class BaseContext {
-  
-//  public abstract Args getArgs();
 
   public abstract TableContext getTableContext(String table) throws IOException;
 
   public abstract BlurConfiguration getBlurConfiguration(String table) throws IOException;
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f6abe276/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
index 0fb95f1..192f743 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
@@ -2,6 +2,7 @@ package org.apache.blur.command;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -51,11 +52,21 @@ public class ControllerClusterContext extends ClusterContext implements
Closeabl
   private final static Log LOG = LogFactory.getLog(ControllerClusterContext.class);
 
   private final TableContextFactory _tableContextFactory;
-  private final Map<Server, Client> _clientMap;
+  private final Map<Server, ClientWithConnection> _clientMap;
   private final ControllerCommandManager _manager;
   private final LayoutFactory _layoutFactory;
   private final BlurObjectSerDe _serDe = new BlurObjectSerDe();
 
+  static class ClientWithConnection {
+    final Client _client;
+    final Connection _connection;
+
+    ClientWithConnection(Client client, Connection connection) {
+      _client = client;
+      _connection = connection;
+    }
+  }
+
   public ControllerClusterContext(TableContextFactory tableContextFactory, LayoutFactory
layoutFactory,
       ControllerCommandManager manager) throws IOException {
     _tableContextFactory = tableContextFactory;
@@ -64,13 +75,15 @@ public class ControllerClusterContext extends ClusterContext implements
Closeabl
     _layoutFactory = layoutFactory;
   }
 
-  public Map<Server, Client> getBlurClientsForCluster(Set<Connection> serverConnections)
throws IOException {
-    Map<Server, Client> clients = new HashMap<Server, Client>();
+  private Map<Server, ClientWithConnection> getBlurClientsForCluster(Set<Connection>
serverConnections)
+      throws IOException {
+    Map<Server, ClientWithConnection> clients = new HashMap<Server, ClientWithConnection>();
     for (Connection serverConnection : serverConnections) {
       try {
         Client client = BlurClientManager.getClientPool().getClient(serverConnection);
         client.refresh();
-        clients.put(new Server(serverConnection.getHost() + ":" + serverConnection.getPort()),
client);
+        ClientWithConnection clientWithConnection = new ClientWithConnection(client, serverConnection);
+        clients.put(new Server(serverConnection.getHost() + ":" + serverConnection.getPort()),
clientWithConnection);
       } catch (TException e) {
         throw new IOException(e);
       }
@@ -100,8 +113,10 @@ public class ControllerClusterContext extends ClusterContext implements
Closeabl
   @Override
   public void close() throws IOException {
     ClientPool clientPool = BlurClientManager.getClientPool();
-    for (Entry<Server, Client> e : _clientMap.entrySet()) {
-      clientPool.returnClient(new Connection(e.getKey().getServer()), e.getValue());
+    Collection<ClientWithConnection> values = _clientMap.values();
+    _clientMap.clear();
+    for (ClientWithConnection clientWithConnection : values) {
+      clientPool.returnClient(clientWithConnection._connection, clientWithConnection._client);
     }
   }
 
@@ -138,10 +153,10 @@ public class ControllerClusterContext extends ClusterContext implements
Closeabl
   private Map<Server, Client> getClientMap(Command<?> command, Set<String>
tables, Set<Shard> shards)
       throws IOException {
     Map<Server, Client> result = new HashMap<Server, Client>();
-    for (Entry<Server, Client> e : _clientMap.entrySet()) {
+    for (Entry<Server, ClientWithConnection> e : _clientMap.entrySet()) {
       Server server = e.getKey();
       if (_layoutFactory.isValidServer(server, tables, shards)) {
-        result.put(server, e.getValue());
+        result.put(server, e.getValue()._client);
       }
     }
     return result;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/f6abe276/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
b/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
index 386bae8..7b8282d 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerCommandManager.java
@@ -15,6 +15,7 @@ import org.apache.blur.command.commandtype.ServerReadCommand;
 import org.apache.blur.server.LayoutFactory;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.server.TableContextFactory;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -43,7 +44,7 @@ public class ControllerCommandManager extends BaseCommandManager {
 
   public Response execute(final TableContextFactory tableContextFactory, LayoutFactory layoutFactory,
       String commandName, ArgumentOverlay argumentOverlay) throws IOException, TimeoutException,
ExceptionCollector {
-    final ClusterContext context = createCommandContext(tableContextFactory, layoutFactory);
+    final ControllerClusterContext context = createCommandContext(tableContextFactory, layoutFactory);
     final Command<?> command = getCommandObject(commandName, argumentOverlay);
     if (command == null) {
       throw new IOException("Command with name [" + commandName + "] not found.");
@@ -53,28 +54,32 @@ public class ControllerCommandManager extends BaseCommandManager {
       public Response call() throws Exception {
         // For those commands that do not implement cluster command, run them in
         // a base impl.
-
-        if (command instanceof IndexReadCommand) {
-          return executeIndexReadCommand(context, command);
-        }
-        if (command instanceof ServerReadCommand) {
-          return executeIndexReadCombiningCommand(context, command);
-        }
-        if (command instanceof ClusterIndexReadCommand) {
-          throw new RuntimeException("Not implemented");
-        }
-        if (command instanceof ClusterServerReadCommand) {
-          CombiningContext combiningContext = getCombiningContext(tableContextFactory);
-          return executeClusterReadCombiningCommand(context, command, combiningContext);
+        try {
+
+          if (command instanceof IndexReadCommand) {
+            return executeIndexReadCommand(context, command);
+          }
+          if (command instanceof ServerReadCommand) {
+            return executeIndexReadCombiningCommand(context, command);
+          }
+          if (command instanceof ClusterIndexReadCommand) {
+            throw new RuntimeException("Not implemented");
+          }
+          if (command instanceof ClusterServerReadCommand) {
+            CombiningContext combiningContext = getCombiningContext(tableContextFactory);
+            return executeClusterReadCombiningCommand(context, command, combiningContext);
+          }
+          if (command instanceof ClusterExecuteServerReadCommand) {
+            return executeClusterCommand(context, command);
+          }
+          if (command instanceof ClusterExecuteCommand) {
+            throw new RuntimeException("Not implemented");
+          }
+
+          throw new IOException("Command type of [" + command.getClass() + "] not supported.");
+        } finally {
+          IOUtils.closeQuietly(context);
         }
-        if (command instanceof ClusterExecuteServerReadCommand) {
-          return executeClusterCommand(context, command);
-        }
-        if (command instanceof ClusterExecuteCommand) {
-          throw new RuntimeException("Not implemented");
-        }
-
-        throw new IOException("Command type of [" + command.getClass() + "] not supported.");
       }
 
     }, command);
@@ -120,8 +125,8 @@ public class ControllerCommandManager extends BaseCommandManager {
     return Response.createNewServerResponse(result);
   }
 
-  private ClusterContext createCommandContext(TableContextFactory tableContextFactory, LayoutFactory
layoutFactory)
-      throws IOException {
+  private ControllerClusterContext createCommandContext(TableContextFactory tableContextFactory,
+      LayoutFactory layoutFactory) throws IOException {
     return new ControllerClusterContext(tableContextFactory, layoutFactory, this);
   }
 


Mime
View raw message