Return-Path: X-Original-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1608D9280 for ; Fri, 24 Feb 2012 21:41:17 +0000 (UTC) Received: (qmail 55428 invoked by uid 500); 24 Feb 2012 21:41:17 -0000 Delivered-To: apmail-incubator-accumulo-commits-archive@incubator.apache.org Received: (qmail 55406 invoked by uid 500); 24 Feb 2012 21:41:17 -0000 Mailing-List: contact accumulo-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: accumulo-dev@incubator.apache.org Delivered-To: mailing list accumulo-commits@incubator.apache.org Received: (qmail 55397 invoked by uid 99); 24 Feb 2012 21:41:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Feb 2012 21:41:17 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Feb 2012 21:41:13 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 709A9238890D; Fri, 24 Feb 2012 21:40:51 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1293438 - in /incubator/accumulo/trunk: ./ src/core/ src/core/src/main/java/org/apache/accumulo/core/client/impl/ src/server/ src/server/src/main/java/org/apache/accumulo/server/master/tableOps/ src/server/src/main/java/org/apache/accumulo... Date: Fri, 24 Feb 2012 21:40:51 -0000 To: accumulo-commits@incubator.apache.org From: kturner@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120224214051.709A9238890D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kturner Date: Fri Feb 24 21:40:50 2012 New Revision: 1293438 URL: http://svn.apache.org/viewvc?rev=1293438&view=rev Log: ACCUMULO-422 * Spread bulk imports evenly across tservers * Closed all cached connections when a server connection has errors * Fixed random walk reporting of stuck nodes Modified: incubator/accumulo/trunk/ (props changed) incubator/accumulo/trunk/src/core/ (props changed) incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java incubator/accumulo/trunk/src/server/ (props changed) incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Module.java incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Node.java Propchange: incubator/accumulo/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Feb 24 21:40:50 2012 @@ -1,3 +1,3 @@ /incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873,1245632 /incubator/accumulo/branches/1.3.5rc:1209938 -/incubator/accumulo/branches/1.4:1201902-1292862 +/incubator/accumulo/branches/1.4:1201902-1293435 Propchange: incubator/accumulo/trunk/src/core/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Feb 24 21:40:50 2012 @@ -1,3 +1,3 @@ /incubator/accumulo/branches/1.3.5rc/src/core:1209938 /incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215 -/incubator/accumulo/branches/1.4/src/core:1201902-1292862 +/incubator/accumulo/branches/1.4/src/core:1201902-1293435 Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java?rev=1293438&r1=1293437&r2=1293438&view=diff ============================================================================== --- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java (original) +++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java Fri Feb 24 21:40:50 2012 @@ -120,6 +120,10 @@ public class ServerClient { static volatile boolean warnedAboutTServersBeingDown = false; public static Pair getConnection(Instance instance) throws TTransportException { + return getConnection(instance, true); + } + + public static Pair getConnection(Instance instance, boolean preferCachedConnections) throws TTransportException { ArgumentChecker.notNull(instance); // create list of servers ArrayList servers = new ArrayList(); @@ -138,7 +142,7 @@ public class ServerClient { boolean opened = false; try { - Pair pair = ThriftTransportPool.getInstance().getAnyTransport(servers); + Pair pair = ThriftTransportPool.getInstance().getAnyTransport(servers, preferCachedConnections); ClientService.Iface client = ThriftUtil.createClient(new ClientService.Client.Factory(), pair.getSecond()); opened = true; warnedAboutTServersBeingDown = false; Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java?rev=1293438&r1=1293437&r2=1293438&view=diff ============================================================================== --- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java (original) +++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java Fri Feb 24 21:40:50 2012 @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.security.SecurityPermission; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -373,20 +374,30 @@ public class ThriftTransportPool { return getTransport(location, port, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)); } - Pair getAnyTransport(List servers) throws TTransportException { + Pair getAnyTransport(List servers, boolean preferCachedConnection) throws TTransportException { servers = new ArrayList(servers); - synchronized (this) { - // atomically find an available location that is in the list - for (Entry> entry : cache.entrySet()) { - if (servers.contains(entry.getKey())) { - for (CachedConnection cachedConnection : entry.getValue()) { - if (!cachedConnection.isReserved()) { - cachedConnection.setReserved(true); - if (log.isTraceEnabled()) - log.trace("Using existing connection to " + entry.getKey().getLocation() + ":" + entry.getKey().getPort()); - return new Pair(entry.getKey().getLocation() + ":" + entry.getKey().getPort(), cachedConnection.transport); + if (preferCachedConnection) { + HashSet serversSet = new HashSet(servers); + + synchronized (this) { + + // randomly pick a server from the connection cache + serversSet.retainAll(cache.keySet()); + + if (serversSet.size() > 0) { + ArrayList cachedServers = new ArrayList(serversSet); + Collections.shuffle(cachedServers, random); + + for (ThriftTransportKey ttk : cachedServers) { + for (CachedConnection cachedConnection : cache.get(ttk)) { + if (!cachedConnection.isReserved()) { + cachedConnection.setReserved(true); + if (log.isTraceEnabled()) + log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort()); + return new Pair(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport); + } } } } @@ -396,8 +407,26 @@ public class ThriftTransportPool { int retryCount = 0; while (servers.size() > 0 && retryCount < 10) { int index = random.nextInt(servers.size()); + ThriftTransportKey ttk = servers.get(index); + + if (!preferCachedConnection) { + synchronized (this) { + List cachedConnList = cache.get(ttk); + if (cachedConnList != null) { + for (CachedConnection cachedConnection : cachedConnList) { + if (!cachedConnection.isReserved()) { + cachedConnection.setReserved(true); + if (log.isTraceEnabled()) + log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort()); + return new Pair(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport); + } + } + } + } + } + try { - return new Pair(servers.get(index).getLocation() + ":" + servers.get(index).getPort(), createNewTransport(servers.get(index))); + return new Pair(ttk.getLocation() + ":" + ttk.getPort(), createNewTransport(ttk)); } catch (TTransportException tte) { log.debug("Failed to connect to " + servers.get(index), tte); servers.remove(index); @@ -478,13 +507,15 @@ public class ThriftTransportPool { boolean existInCache = false; CachedTTransport ctsc = (CachedTTransport) tsc; + ArrayList closeList = new ArrayList(); + synchronized (this) { List ccl = cache.get(ctsc.getCacheKey()); for (Iterator iterator = ccl.iterator(); iterator.hasNext();) { CachedConnection cachedConnection = iterator.next(); if (cachedConnection.transport == tsc) { if (ctsc.sawError) { - tsc.close(); + closeList.add(cachedConnection); iterator.remove(); if (log.isTraceEnabled()) @@ -520,6 +551,26 @@ public class ThriftTransportPool { break; } } + + // remove all unreserved cached connection when a sever has an error, not just the connection that was returned + if (ctsc.sawError) { + for (Iterator iterator = ccl.iterator(); iterator.hasNext();) { + CachedConnection cachedConnection = iterator.next(); + if (!cachedConnection.isReserved()) { + closeList.add(cachedConnection); + iterator.remove(); + } + } + } + } + + // close outside of sync block + for (CachedConnection cachedConnection : closeList) { + try { + cachedConnection.transport.close(); + } catch (Exception e) { + log.debug("Failed to close connection w/ errors", e); + } } if (!existInCache) { Propchange: incubator/accumulo/trunk/src/server/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Feb 24 21:40:50 2012 @@ -1,3 +1,3 @@ /incubator/accumulo/branches/1.3.5rc/src/server:1209938 /incubator/accumulo/branches/1.3/src/server:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611 -/incubator/accumulo/branches/1.4/src/server:1201902-1292862 +/incubator/accumulo/branches/1.4/src/server:1201902-1293435 Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1293438&r1=1293437&r2=1293438&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Fri Feb 24 21:40:50 2012 @@ -424,7 +424,10 @@ class LoadFiles extends MasterRepo { ClientService.Iface client = null; String server = null; try { - Pair pair = ServerClient.getConnection(master.getInstance()); + // get a connection to a random tablet server, do not prefer cached connections because + // this is running on the master and there are lots of connections to tablet servers + // serving the !METADATA tablets + Pair pair = ServerClient.getConnection(master.getInstance(), false); client = pair.getSecond(); server = pair.getFirst(); List attempt = Collections.singletonList(file); Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Module.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Module.java?rev=1293438&r1=1293437&r2=1293438&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Module.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Module.java Fri Feb 24 21:40:50 2012 @@ -162,7 +162,7 @@ public class Module extends Node { this.xmlFile = xmlFile; loadFromXml(); } - + @Override public void visit(State state, Properties props) throws Exception { int maxHops, maxSec; @@ -286,24 +286,19 @@ public class Module extends Node { private void startTimer(final Node initNode) { runningLong.set(false); timer = new Thread(new Runnable() { - @Override public void run() { - while (!runningLong.get()) { - try { - systemTime = System.currentTimeMillis(); - synchronized (timer) { - timer.wait(time); - } - } catch (InterruptedException ie) { - return; - } + try { + systemTime = System.currentTimeMillis(); + Thread.sleep(time); + } catch (InterruptedException ie) { + return; } long timeSinceLastProgress = System.currentTimeMillis() - initNode.lastProgress(); if (timeSinceLastProgress > time) { log.warn("Node " + initNode + " has been running for " + timeSinceLastProgress / 1000.0 + " seconds. You may want to look into it."); + runningLong.set(true); } - runningLong.set(true); } }); initNode.makingProgress(); Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Node.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Node.java?rev=1293438&r1=1293437&r2=1293438&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Node.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/Node.java Fri Feb 24 21:40:50 2012 @@ -43,6 +43,11 @@ public abstract class Node { } @Override + public String toString() { + return this.getClass().getName(); + } + + @Override public int hashCode() { return toString().hashCode(); }