Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8B56F1057B for ; Mon, 6 Jan 2014 22:35:47 +0000 (UTC) Received: (qmail 30979 invoked by uid 500); 6 Jan 2014 22:35:47 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 30890 invoked by uid 500); 6 Jan 2014 22:35:47 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 30873 invoked by uid 99); 6 Jan 2014 22:35:47 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Jan 2014 22:35:47 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 22156882F3F; Mon, 6 Jan 2014 22:35:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kturner@apache.org To: commits@accumulo.apache.org Date: Mon, 06 Jan 2014 22:35:49 -0000 Message-Id: <9585c61eeca6488b854bf293b35293ac@git.apache.org> In-Reply-To: <286c8bbdf49b42dd8607c62b94ebb3ef@git.apache.org> References: <286c8bbdf49b42dd8607c62b94ebb3ef@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT Conflicts: core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f624d402 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f624d402 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f624d402 Branch: refs/heads/1.5.1-SNAPSHOT Commit: f624d402e94d6959dffaab53b9afdd043bd4e8ea Parents: c23126a e946ba0 Author: Keith Turner Authored: Mon Jan 6 17:09:20 2014 -0500 Committer: Keith Turner Committed: Mon Jan 6 17:09:20 2014 -0500 ---------------------------------------------------------------------- .../apache/accumulo/core/client/Instance.java | 8 +--- .../accumulo/core/client/ZooKeeperInstance.java | 50 ++------------------ .../core/client/impl/ThriftTransportPool.java | 16 ++----- .../accumulo/core/client/mock/MockInstance.java | 5 -- .../apache/accumulo/core/util/ThriftUtil.java | 4 -- .../core/client/impl/TabletLocatorImplTest.java | 5 -- .../accumulo/fate/zookeeper/ZooCache.java | 7 --- .../accumulo/fate/zookeeper/ZooReader.java | 12 ----- .../accumulo/server/client/HdfsZooInstance.java | 5 -- 9 files changed, 9 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f624d402/core/src/main/java/org/apache/accumulo/core/client/Instance.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/Instance.java index c67220d,0000000..c5b0a1e mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/client/Instance.java +++ b/core/src/main/java/org/apache/accumulo/core/client/Instance.java @@@ -1,172 -1,0 +1,166 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client; + +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; + +/** + * This class represents the information a client needs to know to connect to an instance of accumulo. + * + */ +public interface Instance { + /** + * Returns the location of the tablet server that is serving the root tablet. + * + * @return location in "hostname:port" form + */ + public abstract String getRootTabletLocation(); + + /** + * Returns the location(s) of the accumulo master and any redundant servers. + * + * @return a list of locations in "hostname:port" form + */ + public abstract List getMasterLocations(); + + /** + * Returns a unique string that identifies this instance of accumulo. + * + * @return a UUID + */ + public abstract String getInstanceID(); + + /** + * Returns the instance name given at system initialization time. + * + * @return current instance name + */ + public abstract String getInstanceName(); + + /** + * Returns a comma-separated list of zookeeper servers the instance is using. + * + * @return the zookeeper servers this instance is using in "hostname:port" form + */ + public abstract String getZooKeepers(); + + /** + * Returns the zookeeper connection timeout. + * + * @return the configured timeout to connect to zookeeper + */ + public abstract int getZooKeepersSessionTimeOut(); + + /** + * Returns a connection to accumulo. + * + * @param user + * a valid accumulo user + * @param pass + * A UTF-8 encoded password. The password may be cleared after making this call. + * @return the accumulo Connector + * @throws AccumuloException + * when a generic exception occurs + * @throws AccumuloSecurityException + * when a user's credentials are invalid + * @deprecated since 1.5, use {@link #getConnector(String, AuthenticationToken)} with {@link PasswordToken} + */ + @Deprecated + public abstract Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException; + + /** + * Returns a connection to accumulo. + * + * @param auth + * An Credentials object. + * @return the accumulo Connector + * @throws AccumuloException + * when a generic exception occurs + * @throws AccumuloSecurityException + * when a user's credentials are invalid + * @deprecated since 1.5, use {@link #getConnector(String, AuthenticationToken)} with {@link PasswordToken} + */ + @Deprecated + public abstract Connector getConnector(org.apache.accumulo.core.security.thrift.AuthInfo auth) throws AccumuloException, AccumuloSecurityException; + + /** + * Returns a connection to accumulo. + * + * @param user + * a valid accumulo user + * @param pass + * A UTF-8 encoded password. The password may be cleared after making this call. + * @return the accumulo Connector + * @throws AccumuloException + * when a generic exception occurs + * @throws AccumuloSecurityException + * when a user's credentials are invalid + * @deprecated since 1.5, use {@link #getConnector(String, AuthenticationToken)} with {@link PasswordToken} + */ + @Deprecated + public abstract Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException; + + /** + * Returns a connection to this instance of accumulo. + * + * @param user + * a valid accumulo user + * @param pass + * If a mutable CharSequence is passed in, it may be cleared after this call. + * @return the accumulo Connector + * @throws AccumuloException + * when a generic exception occurs + * @throws AccumuloSecurityException + * when a user's credentials are invalid + * @deprecated since 1.5, use {@link #getConnector(String, AuthenticationToken)} with {@link PasswordToken} + */ + @Deprecated + public abstract Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException; - - /** - * Closes up the instance to free up all associated resources. You should try to reuse an Instance as much as you can because there is some location caching - * stored which will enhance performance. - */ - public abstract void close(); - ++ + /** + * Returns the AccumuloConfiguration to use when interacting with this instance. + * + * @return the AccumuloConfiguration that specifies properties related to interacting with this instance + */ + public abstract AccumuloConfiguration getConfiguration(); + + /** + * Set the AccumuloConfiguration to use when interacting with this instance. + * + * @param conf + * accumulo configuration + */ + public abstract void setConfiguration(AccumuloConfiguration conf); + + /** + * Returns a connection to this instance of accumulo. + * + * @param principal + * a valid accumulo user + * @param token + * Use the token type configured for the Accumulo instance you are connecting to. An Accumulo instance with default configurations will use + * {@link PasswordToken} + * @since 1.5.0 + */ + public abstract Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException; + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f624d402/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java index d96091b,0000000..ccfb328 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java @@@ -1,353 -1,0 +1,313 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.UUID; - import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.impl.ConnectorImpl; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.file.FileUtil; +import org.apache.accumulo.core.security.CredentialHelper; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.util.ArgumentChecker; +import org.apache.accumulo.core.util.ByteBufferUtil; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.OpTimer; +import org.apache.accumulo.core.util.TextUtil; +import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +/** + *

+ * An implementation of instance that looks in zookeeper to find information needed to connect to an instance of accumulo. + * + *

+ * The advantage of using zookeeper to obtain information about accumulo is that zookeeper is highly available, very responsive, and supports caching. + * + *

+ * Because it is possible for multiple instances of accumulo to share a single set of zookeeper servers, all constructors require an accumulo instance name. + * + * If you do not know the instance names then run accumulo org.apache.accumulo.server.util.ListInstances on an accumulo server. + * + */ + +public class ZooKeeperInstance implements Instance { + + private static final Logger log = Logger.getLogger(ZooKeeperInstance.class); + + private String instanceId = null; + private String instanceName = null; + + private final ZooCache zooCache; + + private final String zooKeepers; + + private final int zooKeepersSessionTimeOut; + - private volatile boolean closed = false; - + /** + * + * @param instanceName + * The name of specific accumulo instance. This is set at initialization time. + * @param zooKeepers + * A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port. + */ + + public ZooKeeperInstance(String instanceName, String zooKeepers) { + this(instanceName, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)); + } + + /** + * + * @param instanceName + * The name of specific accumulo instance. This is set at initialization time. + * @param zooKeepers + * A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port. + * @param sessionTimeout + * zoo keeper session time out in milliseconds. + */ + + public ZooKeeperInstance(String instanceName, String zooKeepers, int sessionTimeout) { + ArgumentChecker.notNull(instanceName, zooKeepers); + this.instanceName = instanceName; + this.zooKeepers = zooKeepers; + this.zooKeepersSessionTimeOut = sessionTimeout; + zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout); + getInstanceID(); - clientInstances.incrementAndGet(); + } + + /** + * + * @param instanceId + * The UUID that identifies the accumulo instance you want to connect to. + * @param zooKeepers + * A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port. + */ + + public ZooKeeperInstance(UUID instanceId, String zooKeepers) { + this(instanceId, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)); + } + + /** + * + * @param instanceId + * The UUID that identifies the accumulo instance you want to connect to. + * @param zooKeepers + * A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port. + * @param sessionTimeout + * zoo keeper session time out in milliseconds. + */ + + public ZooKeeperInstance(UUID instanceId, String zooKeepers, int sessionTimeout) { + ArgumentChecker.notNull(instanceId, zooKeepers); + this.instanceId = instanceId.toString(); + this.zooKeepers = zooKeepers; + this.zooKeepersSessionTimeOut = sessionTimeout; + zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout); - clientInstances.incrementAndGet(); + } + + @Override - public synchronized String getInstanceID() { - if (closed) - throw new RuntimeException("ZooKeeperInstance has been closed."); ++ public String getInstanceID() { + if (instanceId == null) { + // want the instance id to be stable for the life of this instance object, + // so only get it once + String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName; + byte[] iidb = zooCache.get(instanceNamePath); + if (iidb == null) { + throw new RuntimeException("Instance name " + instanceName + + " does not exist in zookeeper. Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list."); + } + instanceId = new String(iidb, Constants.UTF8); + } + + if (zooCache.get(Constants.ZROOT + "/" + instanceId) == null) { + if (instanceName == null) + throw new RuntimeException("Instance id " + instanceId + " does not exist in zookeeper"); + throw new RuntimeException("Instance id " + instanceId + " pointed to by the name " + instanceName + " does not exist in zookeeper"); + } + + return instanceId; + } + + @Override - public synchronized List getMasterLocations() { - if (closed) - throw new RuntimeException("ZooKeeperInstance has been closed."); ++ public List getMasterLocations() { + String masterLocPath = ZooUtil.getRoot(this) + Constants.ZMASTER_LOCK; + + OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up master location in zoocache."); + byte[] loc = ZooUtil.getLockData(zooCache, masterLocPath); + opTimer.stop("Found master at " + (loc == null ? null : new String(loc)) + " in %DURATION%"); + + if (loc == null) { + return Collections.emptyList(); + } + + return Collections.singletonList(new String(loc)); + } + + @Override - public synchronized String getRootTabletLocation() { - if (closed) - throw new RuntimeException("ZooKeeperInstance has been closed."); ++ public String getRootTabletLocation() { + String zRootLocPath = ZooUtil.getRoot(this) + Constants.ZROOT_TABLET_LOCATION; + + OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up root tablet location in zookeeper."); + byte[] loc = zooCache.get(zRootLocPath); + opTimer.stop("Found root tablet at " + (loc == null ? null : new String(loc)) + " in %DURATION%"); + + if (loc == null) { + return null; + } + + return new String(loc).split("\\|")[0]; + } + + @Override - public synchronized String getInstanceName() { - if (closed) - throw new RuntimeException("ZooKeeperInstance has been closed."); ++ public String getInstanceName() { + if (instanceName == null) + instanceName = lookupInstanceName(zooCache, UUID.fromString(getInstanceID())); + + return instanceName; + } + + @Override + public String getZooKeepers() { + return zooKeepers; + } + + @Override + public int getZooKeepersSessionTimeOut() { + return zooKeepersSessionTimeOut; + } + + @Override + @Deprecated + public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException { + return getConnector(user, TextUtil.getBytes(new Text(pass.toString()))); + } + + @Override + @Deprecated + public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException { + return getConnector(user, ByteBufferUtil.toBytes(pass)); + } + + @Override + public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException { + return getConnector(CredentialHelper.create(principal, token, getInstanceID())); + } + + @SuppressWarnings("deprecation") + private Connector getConnector(TCredentials credential) throws AccumuloException, AccumuloSecurityException { + return new ConnectorImpl(this, credential); + } + + @Override + @Deprecated + public Connector getConnector(String principal, byte[] pass) throws AccumuloException, AccumuloSecurityException { - if (closed) { - throw new RuntimeException("ZooKeeperInstance has been closed."); - } else { - return getConnector(principal, new PasswordToken(pass)); - } ++ return getConnector(principal, new PasswordToken(pass)); + } + + private AccumuloConfiguration conf = null; + + @Override + public AccumuloConfiguration getConfiguration() { + if (conf == null) + conf = AccumuloConfiguration.getDefaultConfiguration(); + return conf; + } + + @Override + public void setConfiguration(AccumuloConfiguration conf) { + this.conf = conf; + } + + /** + * @deprecated Use {@link #lookupInstanceName(org.apache.accumulo.fate.zookeeper.ZooCache, UUID)} instead + */ + @Deprecated + public static String lookupInstanceName(org.apache.accumulo.core.zookeeper.ZooCache zooCache, UUID instanceId) { + return lookupInstanceName((ZooCache) zooCache, instanceId); + } + + /** + * Given a zooCache and instanceId, look up the instance name. + * + * @param zooCache + * @param instanceId + * @return the instance name + */ + public static String lookupInstanceName(ZooCache zooCache, UUID instanceId) { + ArgumentChecker.notNull(zooCache, instanceId); + for (String name : zooCache.getChildren(Constants.ZROOT + Constants.ZINSTANCES)) { + String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + name; + byte[] bytes = zooCache.get(instanceNamePath); + UUID iid = UUID.fromString(new String(bytes, Constants.UTF8)); + if (iid.equals(instanceId)) { + return name; + } + } + return null; + } + + /** + * To be moved to server code. Only lives here to support certain client side utilities to minimize command-line options. + */ + @Deprecated + public static String getInstanceIDFromHdfs(Path instanceDirectory) { + try { + FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), AccumuloConfiguration.getSiteConfiguration()); + FileStatus[] files = null; + try { + files = fs.listStatus(instanceDirectory); + } catch (FileNotFoundException ex) { + // ignored + } + log.debug("Trying to read instance id from " + instanceDirectory); + if (files == null || files.length == 0) { + log.error("unable obtain instance id at " + instanceDirectory); + throw new RuntimeException("Accumulo not initialized, there is no instance id at " + instanceDirectory); + } else if (files.length != 1) { + log.error("multiple potential instances in " + instanceDirectory); + throw new RuntimeException("Accumulo found multiple possible instance ids in " + instanceDirectory); + } else { + String result = files[0].getPath().getName(); + return result; + } + } catch (IOException e) { + throw new RuntimeException("Accumulo not initialized, there is no instance id at " + instanceDirectory, e); + } + } + + @Deprecated + @Override + public Connector getConnector(org.apache.accumulo.core.security.thrift.AuthInfo auth) throws AccumuloException, AccumuloSecurityException { + return getConnector(auth.user, auth.password); + } - - static private final AtomicInteger clientInstances = new AtomicInteger(0); - - @Override - public synchronized void close() { - if (!closed && clientInstances.decrementAndGet() == 0) { - try { - zooCache.close(); - ThriftUtil.close(); - } catch (RuntimeException e) { - clientInstances.incrementAndGet(); - throw e; - } - } - closed = true; - } - - @Override - public void finalize() { - // This method intentionally left blank. Users need to explicitly close Instances if they want things cleaned up nicely. - if (!closed) - log.warn("ZooKeeperInstance being cleaned up without being closed. Please remember to call close() before dereferencing to clean up threads."); - } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f624d402/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java index 41b2527,0000000..ceeab21 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java @@@ -1,617 -1,0 +1,607 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.impl; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.SecurityPermission; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.TTimeoutTransport; +import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.log4j.Logger; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class ThriftTransportPool { + private static SecurityPermission TRANSPORT_POOL_PERMISSION = new SecurityPermission("transportPoolPermission"); + + private static final Random random = new Random(); + private long killTime = 1000 * 3; + + private Map> cache = new HashMap>(); + private Map errorCount = new HashMap(); + private Map errorTime = new HashMap(); + private Set serversWarnedAbout = new HashSet(); + + private static final Logger log = Logger.getLogger(ThriftTransportPool.class); + + private static final Long ERROR_THRESHOLD = 20l; + private static final int STUCK_THRESHOLD = 2 * 60 * 1000; + + private static class CachedConnection { + + public CachedConnection(CachedTTransport t) { + this.transport = t; + } + + void setReserved(boolean reserved) { + this.transport.setReserved(reserved); + } + + boolean isReserved() { + return this.transport.reserved; + } + + CachedTTransport transport; + + long lastReturnTime; + } + + private static class Closer implements Runnable { + final ThriftTransportPool pool; - final AtomicBoolean stop; + - public Closer(ThriftTransportPool pool, AtomicBoolean stop) { ++ public Closer(ThriftTransportPool pool) { + this.pool = pool; - this.stop = stop; + } + + public void run() { - while (!stop.get()) { ++ while (true) { + + ArrayList connectionsToClose = new ArrayList(); + + synchronized (pool) { + for (List ccl : pool.cache.values()) { + Iterator iter = ccl.iterator(); + while (iter.hasNext()) { + CachedConnection cachedConnection = iter.next(); + + if (!cachedConnection.isReserved() && System.currentTimeMillis() - cachedConnection.lastReturnTime > pool.killTime) { + connectionsToClose.add(cachedConnection); + iter.remove(); + } + } + } + + for (List ccl : pool.cache.values()) { + for (CachedConnection cachedConnection : ccl) { + cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD); + } + } + + Iterator> iter = pool.errorTime.entrySet().iterator(); + while (iter.hasNext()) { + Entry entry = iter.next(); + long delta = System.currentTimeMillis() - entry.getValue(); + if (delta >= STUCK_THRESHOLD) { + pool.errorCount.remove(entry.getKey()); + iter.remove(); + } + } + } + + // close connections outside of sync block + for (CachedConnection cachedConnection : connectionsToClose) { + cachedConnection.transport.close(); + } + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + static class CachedTTransport extends TTransport { + + private ThriftTransportKey cacheKey; + private TTransport wrappedTransport; + private boolean sawError = false; + + private volatile String ioThreadName = null; + private volatile long ioStartTime = 0; + private volatile boolean reserved = false; + + private String stuckThreadName = null; + + int ioCount = 0; + int lastIoCount = -1; + + private void sawError(Exception e) { + sawError = true; + } + + final void setReserved(boolean reserved) { + this.reserved = reserved; + if (reserved) { + ioThreadName = Thread.currentThread().getName(); + ioCount = 0; + lastIoCount = -1; + } else { + if ((ioCount & 1) == 1) { + // connection unreserved, but it seems io may still be + // happening + log.warn("Connection returned to thrift connection pool that may still be in use " + ioThreadName + " " + Thread.currentThread().getName(), + new Exception()); + } + + ioCount = 0; + lastIoCount = -1; + ioThreadName = null; + } + checkForStuckIO(STUCK_THRESHOLD); + } + + final void checkForStuckIO(long threshold) { + /* + * checking for stuck io needs to be light weight. + * + * Tried to call System.currentTimeMillis() and Thread.currentThread() before every io operation.... this dramatically slowed things down. So switched to + * incrementing a counter before and after each io operation. + */ + + if ((ioCount & 1) == 1) { + // when ioCount is odd, it means I/O is currently happening + if (ioCount == lastIoCount) { + // still doing same I/O operation as last time this + // functions was called + long delta = System.currentTimeMillis() - ioStartTime; + if (delta >= threshold && stuckThreadName == null) { + stuckThreadName = ioThreadName; + log.warn("Thread \"" + ioThreadName + "\" stuck on IO to " + cacheKey + " for at least " + delta + " ms"); + } + } else { + // remember this ioCount and the time we saw it, need to see + // if it changes + lastIoCount = ioCount; + ioStartTime = System.currentTimeMillis(); + + if (stuckThreadName != null) { + // doing I/O, but ioCount changed so no longer stuck + log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO to " + cacheKey + " sawError = " + sawError); + stuckThreadName = null; + } + } + } else { + // I/O is not currently happening + if (stuckThreadName != null) { + // no longer stuck, and was stuck in the past + log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO to " + cacheKey + " sawError = " + sawError); + stuckThreadName = null; + } + } + } + + public CachedTTransport(TTransport transport, ThriftTransportKey cacheKey2) { + this.wrappedTransport = transport; + this.cacheKey = cacheKey2; + } + + public boolean isOpen() { + return wrappedTransport.isOpen(); + } + + public void open() throws TTransportException { + try { + ioCount++; + wrappedTransport.open(); + } catch (TTransportException tte) { + sawError(tte); + throw tte; + } finally { + ioCount++; + } + } + + public int read(byte[] arg0, int arg1, int arg2) throws TTransportException { + try { + ioCount++; + return wrappedTransport.read(arg0, arg1, arg2); + } catch (TTransportException tte) { + sawError(tte); + throw tte; + } finally { + ioCount++; + } + } + + public int readAll(byte[] arg0, int arg1, int arg2) throws TTransportException { + try { + ioCount++; + return wrappedTransport.readAll(arg0, arg1, arg2); + } catch (TTransportException tte) { + sawError(tte); + throw tte; + } finally { + ioCount++; + } + } + + public void write(byte[] arg0, int arg1, int arg2) throws TTransportException { + try { + ioCount++; + wrappedTransport.write(arg0, arg1, arg2); + } catch (TTransportException tte) { + sawError(tte); + throw tte; + } finally { + ioCount++; + } + } + + public void write(byte[] arg0) throws TTransportException { + try { + ioCount++; + wrappedTransport.write(arg0); + } catch (TTransportException tte) { + sawError(tte); + throw tte; + } finally { + ioCount++; + } + } + + public void close() { + try { + ioCount++; + wrappedTransport.close(); + } finally { + ioCount++; + } + + } + + public void flush() throws TTransportException { + try { + ioCount++; + wrappedTransport.flush(); + } catch (TTransportException tte) { + sawError(tte); + throw tte; + } finally { + ioCount++; + } + } + + public boolean peek() { + try { + ioCount++; + return wrappedTransport.peek(); + } finally { + ioCount++; + } + } + + public byte[] getBuffer() { + try { + ioCount++; + return wrappedTransport.getBuffer(); + } finally { + ioCount++; + } + } + + public int getBufferPosition() { + try { + ioCount++; + return wrappedTransport.getBufferPosition(); + } finally { + ioCount++; + } + } + + public int getBytesRemainingInBuffer() { + try { + ioCount++; + return wrappedTransport.getBytesRemainingInBuffer(); + } finally { + ioCount++; + } + } + + public void consumeBuffer(int len) { + try { + ioCount++; + wrappedTransport.consumeBuffer(len); + } finally { + ioCount++; + } + } + + public ThriftTransportKey getCacheKey() { + return cacheKey; + } + + } + + private ThriftTransportPool() {} + + public TTransport getTransport(String location, int port) throws TTransportException { + return getTransport(location, port, 0); + } + + public TTransport getTransportWithDefaultTimeout(InetSocketAddress addr, AccumuloConfiguration conf) throws TTransportException { + return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)); + } + + public TTransport getTransport(InetSocketAddress addr, long timeout) throws TTransportException { + return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), timeout); + } + + public TTransport getTransportWithDefaultTimeout(String location, int port, AccumuloConfiguration conf) throws TTransportException { + return getTransport(location, port, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)); + } + + Pair getAnyTransport(List servers, boolean preferCachedConnection) throws TTransportException { + + servers = new ArrayList(servers); + + if (preferCachedConnection) { + HashSet serversSet = new HashSet(servers); + + synchronized (this) { + + // randomly pick a server from the connection cache + serversSet.retainAll(cache.keySet()); + + if (serversSet.size() > 0) { + ArrayList cachedServers = new ArrayList(serversSet); + Collections.shuffle(cachedServers, random); + + for (ThriftTransportKey ttk : cachedServers) { + for (CachedConnection cachedConnection : cache.get(ttk)) { + if (!cachedConnection.isReserved()) { + cachedConnection.setReserved(true); + if (log.isTraceEnabled()) + log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort()); + return new Pair(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport); + } + } + } + } + } + } + + int retryCount = 0; + while (servers.size() > 0 && retryCount < 10) { + int index = random.nextInt(servers.size()); + ThriftTransportKey ttk = servers.get(index); + + if (!preferCachedConnection) { + synchronized (this) { + List cachedConnList = cache.get(ttk); + if (cachedConnList != null) { + for (CachedConnection cachedConnection : cachedConnList) { + if (!cachedConnection.isReserved()) { + cachedConnection.setReserved(true); + if (log.isTraceEnabled()) + log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort() + " timeout " + ttk.getTimeout()); + return new Pair(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport); + } + } + } + } + } + + try { + return new Pair(ttk.getLocation() + ":" + ttk.getPort(), createNewTransport(ttk)); + } catch (TTransportException tte) { + log.debug("Failed to connect to " + servers.get(index), tte); + servers.remove(index); + retryCount++; + } + } + + throw new TTransportException("Failed to connect to a server"); + } + + public TTransport getTransport(String location, int port, long milliseconds) throws TTransportException { + return getTransport(new ThriftTransportKey(location, port, milliseconds)); + } + + private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException { + synchronized (this) { + // atomically reserve location if it exist in cache + List ccl = cache.get(cacheKey); + + if (ccl == null) { + ccl = new LinkedList(); + cache.put(cacheKey, ccl); + } + + for (CachedConnection cachedConnection : ccl) { + if (!cachedConnection.isReserved()) { + cachedConnection.setReserved(true); + if (log.isTraceEnabled()) + log.trace("Using existing connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort()); + return cachedConnection.transport; + } + } + } + + return createNewTransport(cacheKey); + } + + private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException { + TTransport transport; + if (cacheKey.getTimeout() == 0) { + transport = AddressUtil.createTSocket(cacheKey.getLocation(), cacheKey.getPort()); + } else { + try { + transport = TTimeoutTransport.create(AddressUtil.parseAddress(cacheKey.getLocation(), cacheKey.getPort()), cacheKey.getTimeout()); + } catch (IOException ex) { + throw new TTransportException(ex); + } + } + transport = ThriftUtil.transportFactory().getTransport(transport); + transport.open(); + + if (log.isTraceEnabled()) + log.trace("Creating new connection to connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort()); + + CachedTTransport tsc = new CachedTTransport(transport, cacheKey); + + CachedConnection cc = new CachedConnection(tsc); + cc.setReserved(true); + + synchronized (this) { + List ccl = cache.get(cacheKey); + + if (ccl == null) { + ccl = new LinkedList(); + cache.put(cacheKey, ccl); + } + + ccl.add(cc); + } + return cc.transport; + } + + public void returnTransport(TTransport tsc) { + if (tsc == null) { + return; + } + + boolean existInCache = false; + CachedTTransport ctsc = (CachedTTransport) tsc; + + ArrayList closeList = new ArrayList(); + + synchronized (this) { + List ccl = cache.get(ctsc.getCacheKey()); + for (Iterator iterator = ccl.iterator(); iterator.hasNext();) { + CachedConnection cachedConnection = iterator.next(); + if (cachedConnection.transport == tsc) { + if (ctsc.sawError) { + closeList.add(cachedConnection); + iterator.remove(); + + if (log.isTraceEnabled()) + log.trace("Returned connection had error " + ctsc.getCacheKey()); + + Long ecount = errorCount.get(ctsc.getCacheKey()); + if (ecount == null) + ecount = 0l; + ecount++; + errorCount.put(ctsc.getCacheKey(), ecount); + + Long etime = errorTime.get(ctsc.getCacheKey()); + if (etime == null) { + errorTime.put(ctsc.getCacheKey(), System.currentTimeMillis()); + } + + if (ecount >= ERROR_THRESHOLD && !serversWarnedAbout.contains(ctsc.getCacheKey())) { + log.warn("Server " + ctsc.getCacheKey() + " had " + ecount + " failures in a short time period, will not complain anymore "); + serversWarnedAbout.add(ctsc.getCacheKey()); + } + + cachedConnection.setReserved(false); + + } else { + + if (log.isTraceEnabled()) + log.trace("Returned connection " + ctsc.getCacheKey() + " ioCount : " + cachedConnection.transport.ioCount); + + cachedConnection.lastReturnTime = System.currentTimeMillis(); + cachedConnection.setReserved(false); + } + existInCache = true; + break; + } + } + + // remove all unreserved cached connection when a sever has an error, not just the connection that was returned + if (ctsc.sawError) { + for (Iterator iterator = ccl.iterator(); iterator.hasNext();) { + CachedConnection cachedConnection = iterator.next(); + if (!cachedConnection.isReserved()) { + closeList.add(cachedConnection); + iterator.remove(); + } + } + } + } + + // close outside of sync block + for (CachedConnection cachedConnection : closeList) { + try { + cachedConnection.transport.close(); + } catch (Exception e) { + log.debug("Failed to close connection w/ errors", e); + } + } + + if (!existInCache) { + log.warn("Returned tablet server connection to cache that did not come from cache"); + // close outside of sync block + tsc.close(); + } + } + + /** + * Set the time after which idle connections should be closed + * + * @param time + */ + public synchronized void setIdleTime(long time) { + this.killTime = time; + log.debug("Set thrift transport pool idle time to " + time); + } + + private static ThriftTransportPool instance = new ThriftTransportPool(); + private static final AtomicBoolean daemonStarted = new AtomicBoolean(false); - private static AtomicBoolean stopDaemon; + + public static ThriftTransportPool getInstance() { + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + sm.checkPermission(TRANSPORT_POOL_PERMISSION); + } + + if (daemonStarted.compareAndSet(false, true)) { - stopDaemon = new AtomicBoolean(false); - new Daemon(new Closer(instance, stopDaemon), "Thrift Connection Pool Checker").start(); ++ new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start(); + } + return instance; + } - - public static void close() { - if (daemonStarted.compareAndSet(true, false)) { - stopDaemon.set(true); - } - } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f624d402/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java index d49c349,0000000..c0829df mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java @@@ -1,171 -1,0 +1,166 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.mock; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.util.ByteBufferUtil; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.TextUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; + +/** + * Mock Accumulo provides an in memory implementation of the Accumulo client API. It is possible that the behavior of this implementation may differ subtly from + * the behavior of Accumulo. This could result in unit tests that pass on Mock Accumulo and fail on Accumulo or visa-versa. Documenting the differences would be + * difficult and is not done. + * + *

+ * An alternative to Mock Accumulo called MiniAccumuloCluster was introduced in Accumulo 1.5. MiniAccumuloCluster spins up actual Accumulo server processes, can + * be used for unit testing, and its behavior should match Accumulo. The drawback of MiniAccumuloCluster is that it starts more slowly than Mock Accumulo. + * + */ + +public class MockInstance implements Instance { + + static final String genericAddress = "localhost:1234"; + static final Map instances = new HashMap(); + MockAccumulo acu; + String instanceName; + + public MockInstance() { + acu = new MockAccumulo(getDefaultFileSystem()); + instanceName = "mock-instance"; + } + + static FileSystem getDefaultFileSystem() { + try { + Configuration conf = CachedConfiguration.getInstance(); + conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem"); + conf.set("fs.default.name", "file:///"); + return FileSystem.get(CachedConfiguration.getInstance()); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + public MockInstance(String instanceName) { + this(instanceName, getDefaultFileSystem()); + } + + public MockInstance(String instanceName, FileSystem fs) { + synchronized (instances) { + if (instances.containsKey(instanceName)) + acu = instances.get(instanceName); + else + instances.put(instanceName, acu = new MockAccumulo(fs)); + } + this.instanceName = instanceName; + } + + @Override + public String getRootTabletLocation() { + return genericAddress; + } + + @Override + public List getMasterLocations() { + return Collections.singletonList(genericAddress); + } + + @Override + public String getInstanceID() { + return "mock-instance-id"; + } + + @Override + public String getInstanceName() { + return instanceName; + } + + @Override + public String getZooKeepers() { + return "localhost"; + } + + @Override + public int getZooKeepersSessionTimeOut() { + return 30 * 1000; + } + + @Override + @Deprecated + public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException { + return getConnector(user, new PasswordToken(pass)); + } + + @Override + @Deprecated + public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException { + return getConnector(user, ByteBufferUtil.toBytes(pass)); + } + + @Override + @Deprecated + public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException { + return getConnector(user, TextUtil.getBytes(new Text(pass.toString()))); + } + + AccumuloConfiguration conf = null; + + @Override + public AccumuloConfiguration getConfiguration() { + if (conf == null) + conf = AccumuloConfiguration.getDefaultConfiguration(); + return conf; + } + + @Override + public void setConfiguration(AccumuloConfiguration conf) { + this.conf = conf; + } + + @Deprecated + @Override + public Connector getConnector(org.apache.accumulo.core.security.thrift.AuthInfo auth) throws AccumuloException, AccumuloSecurityException { + return getConnector(auth.user, auth.password); + } + + @Override + public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException { + Connector conn = new MockConnector(principal, acu, this); + if (!acu.users.containsKey(principal)) + conn.securityOperations().createLocalUser(principal, (PasswordToken) token); + else if (!acu.users.get(principal).token.equals(token)) + throw new AccumuloSecurityException(principal, SecurityErrorCode.BAD_CREDENTIALS); + return conn; + } - - @Override - public void close() { - // NOOP - } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f624d402/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java index 881cdfc,0000000..9bffc81 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java @@@ -1,227 -1,0 +1,223 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.impl.ClientExec; +import org.apache.accumulo.core.client.impl.ClientExecReturn; +import org.apache.accumulo.core.client.impl.ThriftTransportPool; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.trace.instrument.Span; +import org.apache.accumulo.trace.instrument.Trace; +import org.apache.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.TServiceClient; +import org.apache.thrift.TServiceClientFactory; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TMessage; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; + + +public class ThriftUtil { + private static final Logger log = Logger.getLogger(ThriftUtil.class); + + public static class TraceProtocol extends TCompactProtocol { + + @Override + public void writeMessageBegin(TMessage message) throws TException { + Trace.start("client:" + message.name); + super.writeMessageBegin(message); + } + + @Override + public void writeMessageEnd() throws TException { + super.writeMessageEnd(); + Span currentTrace = Trace.currentTrace(); + if (currentTrace != null) + currentTrace.stop(); + } + + public TraceProtocol(TTransport transport) { + super(transport); + } + } + + public static class TraceProtocolFactory extends TCompactProtocol.Factory { + private static final long serialVersionUID = 1L; + + @Override + public TProtocol getProtocol(TTransport trans) { + return new TraceProtocol(trans); + } + } + + static private TProtocolFactory protocolFactory = new TraceProtocolFactory(); + static private TTransportFactory transportFactory = new TFramedTransport.Factory(Integer.MAX_VALUE); + + static public T createClient(TServiceClientFactory factory, TTransport transport) { + return factory.getClient(protocolFactory.getProtocol(transport), protocolFactory.getProtocol(transport)); + } + + static public T getClient(TServiceClientFactory factory, InetSocketAddress address, AccumuloConfiguration conf) + throws TTransportException { + return createClient(factory, ThriftTransportPool.getInstance().getTransportWithDefaultTimeout(address, conf)); + } + + static public T getClient(TServiceClientFactory factory, String address, Property property, AccumuloConfiguration configuration) + throws TTransportException { + int port = configuration.getPort(property); + TTransport transport = ThriftTransportPool.getInstance().getTransport(address, port); + return createClient(factory, transport); + } + + static public T getClient(TServiceClientFactory factory, String address, Property property, Property timeoutProperty, + AccumuloConfiguration configuration) throws TTransportException { + return getClient(factory, address, property, configuration.getTimeInMillis(timeoutProperty), configuration); + } + + static public T getClient(TServiceClientFactory factory, String address, Property property, long timeout, + AccumuloConfiguration configuration) throws TTransportException { + int port = configuration.getPort(property); + TTransport transport = ThriftTransportPool.getInstance().getTransport(address, port, timeout); + return createClient(factory, transport); + } + + static public void returnClient(TServiceClient iface) { // Eew... the typing here is horrible + if (iface != null) { + ThriftTransportPool.getInstance().returnTransport(iface.getInputProtocol().getTransport()); + } + } + + static public TabletClientService.Client getTServerClient(String address, AccumuloConfiguration conf) throws TTransportException { + return getClient(new TabletClientService.Client.Factory(), address, Property.TSERV_CLIENTPORT, Property.GENERAL_RPC_TIMEOUT, conf); + } + + static public TabletClientService.Client getTServerClient(String address, AccumuloConfiguration conf, long timeout) throws TTransportException { + return getClient(new TabletClientService.Client.Factory(), address, Property.TSERV_CLIENTPORT, timeout, conf); + } + + public static void execute(String address, AccumuloConfiguration conf, ClientExec exec) throws AccumuloException, + AccumuloSecurityException { + while (true) { + TabletClientService.Client client = null; + try { + exec.execute(client = getTServerClient(address, conf)); + break; + } catch (TTransportException tte) { + log.debug("getTServerClient request failed, retrying ... ", tte); + UtilWaitThread.sleep(100); + } catch (ThriftSecurityException e) { + throw new AccumuloSecurityException(e.user, e.code, e); + } catch (Exception e) { + throw new AccumuloException(e); + } finally { + if (client != null) + returnClient(client); + } + } + } + + public static T execute(String address, AccumuloConfiguration conf, ClientExecReturn exec) throws AccumuloException, + AccumuloSecurityException { + while (true) { + TabletClientService.Client client = null; + try { + return exec.execute(client = getTServerClient(address, conf)); + } catch (TTransportException tte) { + log.debug("getTServerClient request failed, retrying ... ", tte); + UtilWaitThread.sleep(100); + } catch (ThriftSecurityException e) { + throw new AccumuloSecurityException(e.user, e.code, e); + } catch (Exception e) { + throw new AccumuloException(e); + } finally { + if (client != null) + returnClient(client); + } + } + } + + /** + * create a transport that is not pooled + */ + public static TTransport createTransport(String address, int port, AccumuloConfiguration conf) throws TException { + TTransport transport = null; + + try { + transport = TTimeoutTransport.create(org.apache.accumulo.core.util.AddressUtil.parseAddress(address, port), + conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)); + transport = ThriftUtil.transportFactory().getTransport(transport); + transport.open(); + TTransport tmp = transport; + transport = null; + return tmp; + } catch (IOException ex) { + throw new TTransportException(ex); + } finally { + if (transport != null) + transport.close(); + } + + + } + + /** + * create a transport that is not pooled + */ + public static TTransport createTransport(InetSocketAddress address, AccumuloConfiguration conf) throws TException { + return createTransport(address.getAddress().getHostAddress(), address.getPort(), conf); + } + + public static TTransportFactory transportFactory() { + return transportFactory; + } + + private final static Map factoryCache = new HashMap(); + synchronized public static TTransportFactory transportFactory(int maxFrameSize) { + TTransportFactory factory = factoryCache.get(maxFrameSize); + if(factory == null) + { + factory = new TFramedTransport.Factory(maxFrameSize); + factoryCache.put(maxFrameSize,factory); + } + return factory; + } + + synchronized public static TTransportFactory transportFactory(long maxFrameSize) { + if(maxFrameSize > Integer.MAX_VALUE || maxFrameSize < 1) + throw new RuntimeException("Thrift transport frames are limited to "+Integer.MAX_VALUE); + return transportFactory((int)maxFrameSize); + } + + public static TProtocolFactory protocolFactory() { + return protocolFactory; + } - - public static void close() { - ThriftTransportPool.close(); - } +}