accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [3/4] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT
Date Mon, 06 Jan 2014 22:36:18 GMT
Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT

Conflicts:
	core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
	core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
	core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
	fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f624d402
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f624d402
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f624d402

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: f624d402e94d6959dffaab53b9afdd043bd4e8ea
Parents: c23126a e946ba0
Author: Keith Turner <kturner@apache.org>
Authored: Mon Jan 6 17:09:20 2014 -0500
Committer: Keith Turner <kturner@apache.org>
Committed: Mon Jan 6 17:09:20 2014 -0500

----------------------------------------------------------------------
 .../apache/accumulo/core/client/Instance.java   |  8 +---
 .../accumulo/core/client/ZooKeeperInstance.java | 50 ++------------------
 .../core/client/impl/ThriftTransportPool.java   | 16 ++-----
 .../accumulo/core/client/mock/MockInstance.java |  5 --
 .../apache/accumulo/core/util/ThriftUtil.java   |  4 --
 .../core/client/impl/TabletLocatorImplTest.java |  5 --
 .../accumulo/fate/zookeeper/ZooCache.java       |  7 ---
 .../accumulo/fate/zookeeper/ZooReader.java      | 12 -----
 .../accumulo/server/client/HdfsZooInstance.java |  5 --
 9 files changed, 9 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f624d402/core/src/main/java/org/apache/accumulo/core/client/Instance.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/Instance.java
index c67220d,0000000..c5b0a1e
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Instance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Instance.java
@@@ -1,172 -1,0 +1,166 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client;
 +
 +import java.nio.ByteBuffer;
 +import java.util.List;
 +
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +
 +/**
 + * This class represents the information a client needs to know to connect to an instance of accumulo.
 + * 
 + */
 +public interface Instance {
 +  /**
 +   * Returns the location of the tablet server that is serving the root tablet.
 +   * 
 +   * @return location in "hostname:port" form
 +   */
 +  public abstract String getRootTabletLocation();
 +
 +  /**
 +   * Returns the location(s) of the accumulo master and any redundant servers.
 +   * 
 +   * @return a list of locations in "hostname:port" form
 +   */
 +  public abstract List<String> getMasterLocations();
 +
 +  /**
 +   * Returns a unique string that identifies this instance of accumulo.
 +   * 
 +   * @return a UUID
 +   */
 +  public abstract String getInstanceID();
 +
 +  /**
 +   * Returns the instance name given at system initialization time.
 +   * 
 +   * @return current instance name
 +   */
 +  public abstract String getInstanceName();
 +
 +  /**
 +   * Returns a comma-separated list of zookeeper servers the instance is using.
 +   * 
 +   * @return the zookeeper servers this instance is using in "hostname:port" form
 +   */
 +  public abstract String getZooKeepers();
 +
 +  /**
 +   * Returns the zookeeper connection timeout.
 +   * 
 +   * @return the configured timeout to connect to zookeeper
 +   */
 +  public abstract int getZooKeepersSessionTimeOut();
 +
 +  /**
 +   * Returns a connection to accumulo.
 +   * 
 +   * @param user
 +   *          a valid accumulo user
 +   * @param pass
 +   *          A UTF-8 encoded password. The password may be cleared after making this call.
 +   * @return the accumulo Connector
 +   * @throws AccumuloException
 +   *           when a generic exception occurs
 +   * @throws AccumuloSecurityException
 +   *           when a user's credentials are invalid
 +   * @deprecated since 1.5, use {@link #getConnector(String, AuthenticationToken)} with {@link PasswordToken}
 +   */
 +  @Deprecated
 +  public abstract Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException;
 +
 +  /**
 +   * Returns a connection to accumulo.
 +   * 
 +   * @param auth
 +   *          An Credentials object.
 +   * @return the accumulo Connector
 +   * @throws AccumuloException
 +   *           when a generic exception occurs
 +   * @throws AccumuloSecurityException
 +   *           when a user's credentials are invalid
 +   * @deprecated since 1.5, use {@link #getConnector(String, AuthenticationToken)} with {@link PasswordToken}
 +   */
 +  @Deprecated
 +  public abstract Connector getConnector(org.apache.accumulo.core.security.thrift.AuthInfo auth) throws AccumuloException, AccumuloSecurityException;
 +
 +  /**
 +   * Returns a connection to accumulo.
 +   * 
 +   * @param user
 +   *          a valid accumulo user
 +   * @param pass
 +   *          A UTF-8 encoded password. The password may be cleared after making this call.
 +   * @return the accumulo Connector
 +   * @throws AccumuloException
 +   *           when a generic exception occurs
 +   * @throws AccumuloSecurityException
 +   *           when a user's credentials are invalid
 +   * @deprecated since 1.5, use {@link #getConnector(String, AuthenticationToken)} with {@link PasswordToken}
 +   */
 +  @Deprecated
 +  public abstract Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException;
 +
 +  /**
 +   * Returns a connection to this instance of accumulo.
 +   * 
 +   * @param user
 +   *          a valid accumulo user
 +   * @param pass
 +   *          If a mutable CharSequence is passed in, it may be cleared after this call.
 +   * @return the accumulo Connector
 +   * @throws AccumuloException
 +   *           when a generic exception occurs
 +   * @throws AccumuloSecurityException
 +   *           when a user's credentials are invalid
 +   * @deprecated since 1.5, use {@link #getConnector(String, AuthenticationToken)} with {@link PasswordToken}
 +   */
 +  @Deprecated
 +  public abstract Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException;
- 
-   /**
-    * Closes up the instance to free up all associated resources. You should try to reuse an Instance as much as you can because there is some location caching
-    * stored which will enhance performance.
-    */
-   public abstract void close();
- 
++  
 +  /**
 +   * Returns the AccumuloConfiguration to use when interacting with this instance.
 +   * 
 +   * @return the AccumuloConfiguration that specifies properties related to interacting with this instance
 +   */
 +  public abstract AccumuloConfiguration getConfiguration();
 +
 +  /**
 +   * Set the AccumuloConfiguration to use when interacting with this instance.
 +   * 
 +   * @param conf
 +   *          accumulo configuration
 +   */
 +  public abstract void setConfiguration(AccumuloConfiguration conf);
 +
 +  /**
 +   * Returns a connection to this instance of accumulo.
 +   * 
 +   * @param principal
 +   *          a valid accumulo user
 +   * @param token
 +   *          Use the token type configured for the Accumulo instance you are connecting to. An Accumulo instance with default configurations will use
 +   *          {@link PasswordToken}
 +   * @since 1.5.0
 +   */
 +  public abstract Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException;
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f624d402/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
index d96091b,0000000..ccfb328
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
@@@ -1,353 -1,0 +1,313 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.UUID;
- import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.impl.ConnectorImpl;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.file.FileUtil;
 +import org.apache.accumulo.core.security.CredentialHelper;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.util.ArgumentChecker;
 +import org.apache.accumulo.core.util.ByteBufferUtil;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.core.util.OpTimer;
 +import org.apache.accumulo.core.util.TextUtil;
 +import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.ZooCache;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Level;
 +import org.apache.log4j.Logger;
 +
 +/**
 + * <p>
 + * An implementation of instance that looks in zookeeper to find information needed to connect to an instance of accumulo.
 + * 
 + * <p>
 + * The advantage of using zookeeper to obtain information about accumulo is that zookeeper is highly available, very responsive, and supports caching.
 + * 
 + * <p>
 + * Because it is possible for multiple instances of accumulo to share a single set of zookeeper servers, all constructors require an accumulo instance name.
 + * 
 + * If you do not know the instance names then run accumulo org.apache.accumulo.server.util.ListInstances on an accumulo server.
 + * 
 + */
 +
 +public class ZooKeeperInstance implements Instance {
 +
 +  private static final Logger log = Logger.getLogger(ZooKeeperInstance.class);
 +
 +  private String instanceId = null;
 +  private String instanceName = null;
 +
 +  private final ZooCache zooCache;
 +
 +  private final String zooKeepers;
 +
 +  private final int zooKeepersSessionTimeOut;
 +
-   private volatile boolean closed = false;
- 
 +  /**
 +   * 
 +   * @param instanceName
 +   *          The name of specific accumulo instance. This is set at initialization time.
 +   * @param zooKeepers
 +   *          A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
 +   */
 +
 +  public ZooKeeperInstance(String instanceName, String zooKeepers) {
 +    this(instanceName, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
 +  }
 +
 +  /**
 +   * 
 +   * @param instanceName
 +   *          The name of specific accumulo instance. This is set at initialization time.
 +   * @param zooKeepers
 +   *          A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
 +   * @param sessionTimeout
 +   *          zoo keeper session time out in milliseconds.
 +   */
 +
 +  public ZooKeeperInstance(String instanceName, String zooKeepers, int sessionTimeout) {
 +    ArgumentChecker.notNull(instanceName, zooKeepers);
 +    this.instanceName = instanceName;
 +    this.zooKeepers = zooKeepers;
 +    this.zooKeepersSessionTimeOut = sessionTimeout;
 +    zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
 +    getInstanceID();
-     clientInstances.incrementAndGet();
 +  }
 +
 +  /**
 +   * 
 +   * @param instanceId
 +   *          The UUID that identifies the accumulo instance you want to connect to.
 +   * @param zooKeepers
 +   *          A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
 +   */
 +
 +  public ZooKeeperInstance(UUID instanceId, String zooKeepers) {
 +    this(instanceId, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
 +  }
 +
 +  /**
 +   * 
 +   * @param instanceId
 +   *          The UUID that identifies the accumulo instance you want to connect to.
 +   * @param zooKeepers
 +   *          A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
 +   * @param sessionTimeout
 +   *          zoo keeper session time out in milliseconds.
 +   */
 +
 +  public ZooKeeperInstance(UUID instanceId, String zooKeepers, int sessionTimeout) {
 +    ArgumentChecker.notNull(instanceId, zooKeepers);
 +    this.instanceId = instanceId.toString();
 +    this.zooKeepers = zooKeepers;
 +    this.zooKeepersSessionTimeOut = sessionTimeout;
 +    zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
-     clientInstances.incrementAndGet();
 +  }
 +
 +  @Override
-   public synchronized String getInstanceID() {
-     if (closed)
-       throw new RuntimeException("ZooKeeperInstance has been closed.");
++  public String getInstanceID() {
 +    if (instanceId == null) {
 +      // want the instance id to be stable for the life of this instance object,
 +      // so only get it once
 +      String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName;
 +      byte[] iidb = zooCache.get(instanceNamePath);
 +      if (iidb == null) {
 +        throw new RuntimeException("Instance name " + instanceName
 +            + " does not exist in zookeeper.  Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list.");
 +      }
 +      instanceId = new String(iidb, Constants.UTF8);
 +    }
 +
 +    if (zooCache.get(Constants.ZROOT + "/" + instanceId) == null) {
 +      if (instanceName == null)
 +        throw new RuntimeException("Instance id " + instanceId + " does not exist in zookeeper");
 +      throw new RuntimeException("Instance id " + instanceId + " pointed to by the name " + instanceName + " does not exist in zookeeper");
 +    }
 +
 +    return instanceId;
 +  }
 +
 +  @Override
-   public synchronized List<String> getMasterLocations() {
-     if (closed)
-       throw new RuntimeException("ZooKeeperInstance has been closed.");
++  public List<String> getMasterLocations() {
 +    String masterLocPath = ZooUtil.getRoot(this) + Constants.ZMASTER_LOCK;
 +
 +    OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up master location in zoocache.");
 +    byte[] loc = ZooUtil.getLockData(zooCache, masterLocPath);
 +    opTimer.stop("Found master at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
 +
 +    if (loc == null) {
 +      return Collections.emptyList();
 +    }
 +
 +    return Collections.singletonList(new String(loc));
 +  }
 +
 +  @Override
-   public synchronized String getRootTabletLocation() {
-     if (closed)
-       throw new RuntimeException("ZooKeeperInstance has been closed.");
++  public String getRootTabletLocation() {
 +    String zRootLocPath = ZooUtil.getRoot(this) + Constants.ZROOT_TABLET_LOCATION;
 +
 +    OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up root tablet location in zookeeper.");
 +    byte[] loc = zooCache.get(zRootLocPath);
 +    opTimer.stop("Found root tablet at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
 +
 +    if (loc == null) {
 +      return null;
 +    }
 +
 +    return new String(loc).split("\\|")[0];
 +  }
 +
 +  @Override
-   public synchronized String getInstanceName() {
-     if (closed)
-       throw new RuntimeException("ZooKeeperInstance has been closed.");
++  public String getInstanceName() {
 +    if (instanceName == null)
 +      instanceName = lookupInstanceName(zooCache, UUID.fromString(getInstanceID()));
 +
 +    return instanceName;
 +  }
 +
 +  @Override
 +  public String getZooKeepers() {
 +    return zooKeepers;
 +  }
 +
 +  @Override
 +  public int getZooKeepersSessionTimeOut() {
 +    return zooKeepersSessionTimeOut;
 +  }
 +
 +  @Override
 +  @Deprecated
 +  public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(user, TextUtil.getBytes(new Text(pass.toString())));
 +  }
 +
 +  @Override
 +  @Deprecated
 +  public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(user, ByteBufferUtil.toBytes(pass));
 +  }
 +
 +  @Override
 +  public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(CredentialHelper.create(principal, token, getInstanceID()));
 +  }
 +  
 +  @SuppressWarnings("deprecation")
 +  private Connector getConnector(TCredentials credential) throws AccumuloException, AccumuloSecurityException {
 +    return new ConnectorImpl(this, credential);
 +  }
 +  
 +  @Override
 +  @Deprecated
 +  public Connector getConnector(String principal, byte[] pass) throws AccumuloException, AccumuloSecurityException {
-     if (closed) {
-       throw new RuntimeException("ZooKeeperInstance has been closed.");
-     } else {
-       return getConnector(principal, new PasswordToken(pass));
-     }
++    return getConnector(principal, new PasswordToken(pass));
 +  }
 +
 +  private AccumuloConfiguration conf = null;
 +
 +  @Override
 +  public AccumuloConfiguration getConfiguration() {
 +    if (conf == null)
 +      conf = AccumuloConfiguration.getDefaultConfiguration();
 +    return conf;
 +  }
 +
 +  @Override
 +  public void setConfiguration(AccumuloConfiguration conf) {
 +    this.conf = conf;
 +  }
 +
 +  /**
 +   * @deprecated Use {@link #lookupInstanceName(org.apache.accumulo.fate.zookeeper.ZooCache, UUID)} instead
 +   */
 +  @Deprecated
 +  public static String lookupInstanceName(org.apache.accumulo.core.zookeeper.ZooCache zooCache, UUID instanceId) {
 +    return lookupInstanceName((ZooCache) zooCache, instanceId);
 +  }
 +  
 +  /**
 +   * Given a zooCache and instanceId, look up the instance name.
 +   * 
 +   * @param zooCache
 +   * @param instanceId
 +   * @return the instance name
 +   */
 +  public static String lookupInstanceName(ZooCache zooCache, UUID instanceId) {
 +    ArgumentChecker.notNull(zooCache, instanceId);
 +    for (String name : zooCache.getChildren(Constants.ZROOT + Constants.ZINSTANCES)) {
 +      String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + name;
 +      byte[] bytes = zooCache.get(instanceNamePath);
 +      UUID iid = UUID.fromString(new String(bytes, Constants.UTF8));
 +      if (iid.equals(instanceId)) {
 +        return name;
 +      }
 +    }
 +    return null;
 +  }
 +  
 +  /**
 +   * To be moved to server code. Only lives here to support certain client side utilities to minimize command-line options.
 +   */
 +  @Deprecated
 +  public static String getInstanceIDFromHdfs(Path instanceDirectory) {
 +    try {
 +      FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), AccumuloConfiguration.getSiteConfiguration());
 +      FileStatus[] files = null;
 +      try {
 +        files = fs.listStatus(instanceDirectory);
 +      } catch (FileNotFoundException ex) {
 +        // ignored
 +      }
 +      log.debug("Trying to read instance id from " + instanceDirectory);
 +      if (files == null || files.length == 0) {
 +        log.error("unable obtain instance id at " + instanceDirectory);
 +        throw new RuntimeException("Accumulo not initialized, there is no instance id at " + instanceDirectory);
 +      } else if (files.length != 1) {
 +        log.error("multiple potential instances in " + instanceDirectory);
 +        throw new RuntimeException("Accumulo found multiple possible instance ids in " + instanceDirectory);
 +      } else {
 +        String result = files[0].getPath().getName();
 +        return result;
 +      }
 +    } catch (IOException e) {
 +      throw new RuntimeException("Accumulo not initialized, there is no instance id at " + instanceDirectory, e);
 +    }
 +  }
 +  
 +  @Deprecated
 +  @Override
 +  public Connector getConnector(org.apache.accumulo.core.security.thrift.AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(auth.user, auth.password);
 +  }
- 
-   static private final AtomicInteger clientInstances = new AtomicInteger(0);
- 
-   @Override
-   public synchronized void close() {
-     if (!closed && clientInstances.decrementAndGet() == 0) {
-       try {
-         zooCache.close();
-         ThriftUtil.close();
-       } catch (RuntimeException e) {
-         clientInstances.incrementAndGet();
-         throw e;
-       }
-     }
-     closed = true;
-   }
- 
-   @Override
-   public void finalize() {
-     // This method intentionally left blank. Users need to explicitly close Instances if they want things cleaned up nicely.
-     if (!closed)
-       log.warn("ZooKeeperInstance being cleaned up without being closed. Please remember to call close() before dereferencing to clean up threads.");
-   }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f624d402/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index 41b2527,0000000..ceeab21
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@@ -1,617 -1,0 +1,607 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.impl;
 +
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.security.SecurityPermission;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.util.AddressUtil;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.TTimeoutTransport;
 +import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +
 +public class ThriftTransportPool {
 +  private static SecurityPermission TRANSPORT_POOL_PERMISSION = new SecurityPermission("transportPoolPermission");
 +  
 +  private static final Random random = new Random();
 +  private long killTime = 1000 * 3;
 +  
 +  private Map<ThriftTransportKey,List<CachedConnection>> cache = new HashMap<ThriftTransportKey,List<CachedConnection>>();
 +  private Map<ThriftTransportKey,Long> errorCount = new HashMap<ThriftTransportKey,Long>();
 +  private Map<ThriftTransportKey,Long> errorTime = new HashMap<ThriftTransportKey,Long>();
 +  private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<ThriftTransportKey>();
 +  
 +  private static final Logger log = Logger.getLogger(ThriftTransportPool.class);
 +  
 +  private static final Long ERROR_THRESHOLD = 20l;
 +  private static final int STUCK_THRESHOLD = 2 * 60 * 1000;
 +  
 +  private static class CachedConnection {
 +    
 +    public CachedConnection(CachedTTransport t) {
 +      this.transport = t;
 +    }
 +    
 +    void setReserved(boolean reserved) {
 +      this.transport.setReserved(reserved);
 +    }
 +    
 +    boolean isReserved() {
 +      return this.transport.reserved;
 +    }
 +    
 +    CachedTTransport transport;
 +    
 +    long lastReturnTime;
 +  }
 +  
 +  private static class Closer implements Runnable {
 +    final ThriftTransportPool pool;
-     final AtomicBoolean stop;
 +    
-     public Closer(ThriftTransportPool pool, AtomicBoolean stop) {
++    public Closer(ThriftTransportPool pool) {
 +      this.pool = pool;
-       this.stop = stop;
 +    }
 +    
 +    public void run() {
-       while (!stop.get()) {
++      while (true) {
 +        
 +        ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>();
 +        
 +        synchronized (pool) {
 +          for (List<CachedConnection> ccl : pool.cache.values()) {
 +            Iterator<CachedConnection> iter = ccl.iterator();
 +            while (iter.hasNext()) {
 +              CachedConnection cachedConnection = iter.next();
 +              
 +              if (!cachedConnection.isReserved() && System.currentTimeMillis() - cachedConnection.lastReturnTime > pool.killTime) {
 +                connectionsToClose.add(cachedConnection);
 +                iter.remove();
 +              }
 +            }
 +          }
 +          
 +          for (List<CachedConnection> ccl : pool.cache.values()) {
 +            for (CachedConnection cachedConnection : ccl) {
 +              cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD);
 +            }
 +          }
 +          
 +          Iterator<Entry<ThriftTransportKey,Long>> iter = pool.errorTime.entrySet().iterator();
 +          while (iter.hasNext()) {
 +            Entry<ThriftTransportKey,Long> entry = iter.next();
 +            long delta = System.currentTimeMillis() - entry.getValue();
 +            if (delta >= STUCK_THRESHOLD) {
 +              pool.errorCount.remove(entry.getKey());
 +              iter.remove();
 +            }
 +          }
 +        }
 +        
 +        // close connections outside of sync block
 +        for (CachedConnection cachedConnection : connectionsToClose) {
 +          cachedConnection.transport.close();
 +        }
 +        
 +        try {
 +          Thread.sleep(500);
 +        } catch (InterruptedException e) {
 +          e.printStackTrace();
 +        }
 +      }
 +    }
 +  }
 +  
 +  static class CachedTTransport extends TTransport {
 +    
 +    private ThriftTransportKey cacheKey;
 +    private TTransport wrappedTransport;
 +    private boolean sawError = false;
 +    
 +    private volatile String ioThreadName = null;
 +    private volatile long ioStartTime = 0;
 +    private volatile boolean reserved = false;
 +    
 +    private String stuckThreadName = null;
 +    
 +    int ioCount = 0;
 +    int lastIoCount = -1;
 +    
 +    private void sawError(Exception e) {
 +      sawError = true;
 +    }
 +    
 +    final void setReserved(boolean reserved) {
 +      this.reserved = reserved;
 +      if (reserved) {
 +        ioThreadName = Thread.currentThread().getName();
 +        ioCount = 0;
 +        lastIoCount = -1;
 +      } else {
 +        if ((ioCount & 1) == 1) {
 +          // connection unreserved, but it seems io may still be
 +          // happening
 +          log.warn("Connection returned to thrift connection pool that may still be in use " + ioThreadName + " " + Thread.currentThread().getName(),
 +              new Exception());
 +        }
 +        
 +        ioCount = 0;
 +        lastIoCount = -1;
 +        ioThreadName = null;
 +      }
 +      checkForStuckIO(STUCK_THRESHOLD);
 +    }
 +    
 +    final void checkForStuckIO(long threshold) {
 +      /*
 +       * checking for stuck io needs to be light weight.
 +       * 
 +       * Tried to call System.currentTimeMillis() and Thread.currentThread() before every io operation.... this dramatically slowed things down. So switched to
 +       * incrementing a counter before and after each io operation.
 +       */
 +      
 +      if ((ioCount & 1) == 1) {
 +        // when ioCount is odd, it means I/O is currently happening
 +        if (ioCount == lastIoCount) {
 +          // still doing same I/O operation as last time this
 +          // functions was called
 +          long delta = System.currentTimeMillis() - ioStartTime;
 +          if (delta >= threshold && stuckThreadName == null) {
 +            stuckThreadName = ioThreadName;
 +            log.warn("Thread \"" + ioThreadName + "\" stuck on IO  to " + cacheKey + " for at least " + delta + " ms");
 +          }
 +        } else {
 +          // remember this ioCount and the time we saw it, need to see
 +          // if it changes
 +          lastIoCount = ioCount;
 +          ioStartTime = System.currentTimeMillis();
 +          
 +          if (stuckThreadName != null) {
 +            // doing I/O, but ioCount changed so no longer stuck
 +            log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO  to " + cacheKey + " sawError = " + sawError);
 +            stuckThreadName = null;
 +          }
 +        }
 +      } else {
 +        // I/O is not currently happening
 +        if (stuckThreadName != null) {
 +          // no longer stuck, and was stuck in the past
 +          log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO  to " + cacheKey + " sawError = " + sawError);
 +          stuckThreadName = null;
 +        }
 +      }
 +    }
 +    
 +    public CachedTTransport(TTransport transport, ThriftTransportKey cacheKey2) {
 +      this.wrappedTransport = transport;
 +      this.cacheKey = cacheKey2;
 +    }
 +    
 +    public boolean isOpen() {
 +      return wrappedTransport.isOpen();
 +    }
 +    
 +    public void open() throws TTransportException {
 +      try {
 +        ioCount++;
 +        wrappedTransport.open();
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public int read(byte[] arg0, int arg1, int arg2) throws TTransportException {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.read(arg0, arg1, arg2);
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public int readAll(byte[] arg0, int arg1, int arg2) throws TTransportException {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.readAll(arg0, arg1, arg2);
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public void write(byte[] arg0, int arg1, int arg2) throws TTransportException {
 +      try {
 +        ioCount++;
 +        wrappedTransport.write(arg0, arg1, arg2);
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public void write(byte[] arg0) throws TTransportException {
 +      try {
 +        ioCount++;
 +        wrappedTransport.write(arg0);
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public void close() {
 +      try {
 +        ioCount++;
 +        wrappedTransport.close();
 +      } finally {
 +        ioCount++;
 +      }
 +      
 +    }
 +    
 +    public void flush() throws TTransportException {
 +      try {
 +        ioCount++;
 +        wrappedTransport.flush();
 +      } catch (TTransportException tte) {
 +        sawError(tte);
 +        throw tte;
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public boolean peek() {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.peek();
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public byte[] getBuffer() {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.getBuffer();
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public int getBufferPosition() {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.getBufferPosition();
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public int getBytesRemainingInBuffer() {
 +      try {
 +        ioCount++;
 +        return wrappedTransport.getBytesRemainingInBuffer();
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public void consumeBuffer(int len) {
 +      try {
 +        ioCount++;
 +        wrappedTransport.consumeBuffer(len);
 +      } finally {
 +        ioCount++;
 +      }
 +    }
 +    
 +    public ThriftTransportKey getCacheKey() {
 +      return cacheKey;
 +    }
 +    
 +  }
 +  
 +  private ThriftTransportPool() {}
 +  
 +  public TTransport getTransport(String location, int port) throws TTransportException {
 +    return getTransport(location, port, 0);
 +  }
 +  
 +  public TTransport getTransportWithDefaultTimeout(InetSocketAddress addr, AccumuloConfiguration conf) throws TTransportException {
 +    return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +  }
 +  
 +  public TTransport getTransport(InetSocketAddress addr, long timeout) throws TTransportException {
 +    return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), timeout);
 +  }
 +  
 +  public TTransport getTransportWithDefaultTimeout(String location, int port, AccumuloConfiguration conf) throws TTransportException {
 +    return getTransport(location, port, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +  }
 +  
 +  Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean preferCachedConnection) throws TTransportException {
 +    
 +    servers = new ArrayList<ThriftTransportKey>(servers);
 +    
 +    if (preferCachedConnection) {
 +      HashSet<ThriftTransportKey> serversSet = new HashSet<ThriftTransportKey>(servers);
 +      
 +      synchronized (this) {
 +        
 +        // randomly pick a server from the connection cache
 +        serversSet.retainAll(cache.keySet());
 +        
 +        if (serversSet.size() > 0) {
 +          ArrayList<ThriftTransportKey> cachedServers = new ArrayList<ThriftTransportKey>(serversSet);
 +          Collections.shuffle(cachedServers, random);
 +          
 +          for (ThriftTransportKey ttk : cachedServers) {
 +            for (CachedConnection cachedConnection : cache.get(ttk)) {
 +              if (!cachedConnection.isReserved()) {
 +                cachedConnection.setReserved(true);
 +                if (log.isTraceEnabled())
 +                  log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort());
 +                return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport);
 +              }
 +            }
 +          }
 +        }
 +      }
 +    }
 +    
 +    int retryCount = 0;
 +    while (servers.size() > 0 && retryCount < 10) {
 +      int index = random.nextInt(servers.size());
 +      ThriftTransportKey ttk = servers.get(index);
 +      
 +      if (!preferCachedConnection) {
 +        synchronized (this) {
 +          List<CachedConnection> cachedConnList = cache.get(ttk);
 +          if (cachedConnList != null) {
 +            for (CachedConnection cachedConnection : cachedConnList) {
 +              if (!cachedConnection.isReserved()) {
 +                cachedConnection.setReserved(true);
 +                if (log.isTraceEnabled())
 +                  log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort() + " timeout " + ttk.getTimeout());
 +                return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport);
 +              }
 +            }
 +          }
 +        }
 +      }
 +
 +      try {
 +        return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), createNewTransport(ttk));
 +      } catch (TTransportException tte) {
 +        log.debug("Failed to connect to " + servers.get(index), tte);
 +        servers.remove(index);
 +        retryCount++;
 +      }
 +    }
 +    
 +    throw new TTransportException("Failed to connect to a server");
 +  }
 +  
 +  public TTransport getTransport(String location, int port, long milliseconds) throws TTransportException {
 +    return getTransport(new ThriftTransportKey(location, port, milliseconds));
 +  }
 +  
 +  private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException {
 +    synchronized (this) {
 +      // atomically reserve location if it exist in cache
 +      List<CachedConnection> ccl = cache.get(cacheKey);
 +      
 +      if (ccl == null) {
 +        ccl = new LinkedList<CachedConnection>();
 +        cache.put(cacheKey, ccl);
 +      }
 +      
 +      for (CachedConnection cachedConnection : ccl) {
 +        if (!cachedConnection.isReserved()) {
 +          cachedConnection.setReserved(true);
 +          if (log.isTraceEnabled())
 +            log.trace("Using existing connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
 +          return cachedConnection.transport;
 +        }
 +      }
 +    }
 +    
 +    return createNewTransport(cacheKey);
 +  }
 +  
 +  private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException {
 +    TTransport transport;
 +    if (cacheKey.getTimeout() == 0) {
 +      transport = AddressUtil.createTSocket(cacheKey.getLocation(), cacheKey.getPort());
 +    } else {
 +      try {
 +        transport = TTimeoutTransport.create(AddressUtil.parseAddress(cacheKey.getLocation(), cacheKey.getPort()), cacheKey.getTimeout());
 +      } catch (IOException ex) {
 +        throw new TTransportException(ex);
 +      }
 +    }
 +    transport = ThriftUtil.transportFactory().getTransport(transport);
 +    transport.open();
 +    
 +    if (log.isTraceEnabled())
 +      log.trace("Creating new connection to connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
 +    
 +    CachedTTransport tsc = new CachedTTransport(transport, cacheKey);
 +    
 +    CachedConnection cc = new CachedConnection(tsc);
 +    cc.setReserved(true);
 +    
 +    synchronized (this) {
 +      List<CachedConnection> ccl = cache.get(cacheKey);
 +      
 +      if (ccl == null) {
 +        ccl = new LinkedList<CachedConnection>();
 +        cache.put(cacheKey, ccl);
 +      }
 +      
 +      ccl.add(cc);
 +    }
 +    return cc.transport;
 +  }
 +  
 +  public void returnTransport(TTransport tsc) {
 +    if (tsc == null) {
 +      return;
 +    }
 +    
 +    boolean existInCache = false;
 +    CachedTTransport ctsc = (CachedTTransport) tsc;
 +    
 +    ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>();
 +
 +    synchronized (this) {
 +      List<CachedConnection> ccl = cache.get(ctsc.getCacheKey());
 +      for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
 +        CachedConnection cachedConnection = iterator.next();
 +        if (cachedConnection.transport == tsc) {
 +          if (ctsc.sawError) {
 +            closeList.add(cachedConnection);
 +            iterator.remove();
 +            
 +            if (log.isTraceEnabled())
 +              log.trace("Returned connection had error " + ctsc.getCacheKey());
 +            
 +            Long ecount = errorCount.get(ctsc.getCacheKey());
 +            if (ecount == null)
 +              ecount = 0l;
 +            ecount++;
 +            errorCount.put(ctsc.getCacheKey(), ecount);
 +            
 +            Long etime = errorTime.get(ctsc.getCacheKey());
 +            if (etime == null) {
 +              errorTime.put(ctsc.getCacheKey(), System.currentTimeMillis());
 +            }
 +            
 +            if (ecount >= ERROR_THRESHOLD && !serversWarnedAbout.contains(ctsc.getCacheKey())) {
 +              log.warn("Server " + ctsc.getCacheKey() + " had " + ecount + " failures in a short time period, will not complain anymore ");
 +              serversWarnedAbout.add(ctsc.getCacheKey());
 +            }
 +            
 +            cachedConnection.setReserved(false);
 +            
 +          } else {
 +            
 +            if (log.isTraceEnabled())
 +              log.trace("Returned connection " + ctsc.getCacheKey() + " ioCount : " + cachedConnection.transport.ioCount);
 +            
 +            cachedConnection.lastReturnTime = System.currentTimeMillis();
 +            cachedConnection.setReserved(false);
 +          }
 +          existInCache = true;
 +          break;
 +        }
 +      }
 +      
 +      // remove all unreserved cached connection when a sever has an error, not just the connection that was returned
 +      if (ctsc.sawError) {
 +        for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
 +          CachedConnection cachedConnection = iterator.next();
 +          if (!cachedConnection.isReserved()) {
 +            closeList.add(cachedConnection);
 +            iterator.remove();
 +          }
 +        }
 +      }
 +    }
 +    
 +    // close outside of sync block
 +    for (CachedConnection cachedConnection : closeList) {
 +      try {
 +        cachedConnection.transport.close();
 +      } catch (Exception e) {
 +        log.debug("Failed to close connection w/ errors", e);
 +      }
 +    }
 +    
 +    if (!existInCache) {
 +      log.warn("Returned tablet server connection to cache that did not come from cache");
 +      // close outside of sync block
 +      tsc.close();
 +    }
 +  }
 +  
 +  /**
 +   * Set the time after which idle connections should be closed
 +   * 
 +   * @param time
 +   */
 +  public synchronized void setIdleTime(long time) {
 +    this.killTime = time;
 +    log.debug("Set thrift transport pool idle time to " + time);
 +  }
 +
 +  private static ThriftTransportPool instance = new ThriftTransportPool();
 +  private static final AtomicBoolean daemonStarted = new AtomicBoolean(false);
-   private static AtomicBoolean stopDaemon;
 +  
 +  public static ThriftTransportPool getInstance() {
 +    SecurityManager sm = System.getSecurityManager();
 +    if (sm != null) {
 +      sm.checkPermission(TRANSPORT_POOL_PERMISSION);
 +    }
 +    
 +    if (daemonStarted.compareAndSet(false, true)) {
-       stopDaemon = new AtomicBoolean(false);
-       new Daemon(new Closer(instance, stopDaemon), "Thrift Connection Pool Checker").start();
++      new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start();
 +    }
 +    return instance;
 +  }
-   
-   public static void close() {
-     if (daemonStarted.compareAndSet(true, false)) {
-       stopDaemon.set(true);
-     }
-   }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f624d402/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
index d49c349,0000000..c0829df
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
@@@ -1,171 -1,0 +1,166 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.mock;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.util.ByteBufferUtil;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.core.util.TextUtil;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.io.Text;
 +
 +/**
 + * Mock Accumulo provides an in memory implementation of the Accumulo client API. It is possible that the behavior of this implementation may differ subtly from
 + * the behavior of Accumulo. This could result in unit tests that pass on Mock Accumulo and fail on Accumulo or visa-versa. Documenting the differences would be
 + * difficult and is not done.
 + * 
 + * <p>
 + * An alternative to Mock Accumulo called MiniAccumuloCluster was introduced in Accumulo 1.5. MiniAccumuloCluster spins up actual Accumulo server processes, can
 + * be used for unit testing, and its behavior should match Accumulo. The drawback of MiniAccumuloCluster is that it starts more slowly than Mock Accumulo.
 + * 
 + */
 +
 +public class MockInstance implements Instance {
 +  
 +  static final String genericAddress = "localhost:1234";
 +  static final Map<String,MockAccumulo> instances = new HashMap<String,MockAccumulo>();
 +  MockAccumulo acu;
 +  String instanceName;
 +  
 +  public MockInstance() {
 +    acu = new MockAccumulo(getDefaultFileSystem());
 +    instanceName = "mock-instance";
 +  }
 +  
 +  static FileSystem getDefaultFileSystem() {
 +    try {
 +      Configuration conf = CachedConfiguration.getInstance();
 +      conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
 +      conf.set("fs.default.name", "file:///");
 +      return FileSystem.get(CachedConfiguration.getInstance());
 +    } catch (IOException ex) {
 +      throw new RuntimeException(ex);
 +    }
 +  }
 +  
 +  public MockInstance(String instanceName) {
 +    this(instanceName, getDefaultFileSystem());
 +  }
 +  
 +  public MockInstance(String instanceName, FileSystem fs) {
 +    synchronized (instances) {
 +      if (instances.containsKey(instanceName))
 +        acu = instances.get(instanceName);
 +      else
 +        instances.put(instanceName, acu = new MockAccumulo(fs));
 +    }
 +    this.instanceName = instanceName;
 +  }
 +  
 +  @Override
 +  public String getRootTabletLocation() {
 +    return genericAddress;
 +  }
 +  
 +  @Override
 +  public List<String> getMasterLocations() {
 +    return Collections.singletonList(genericAddress);
 +  }
 +  
 +  @Override
 +  public String getInstanceID() {
 +    return "mock-instance-id";
 +  }
 +  
 +  @Override
 +  public String getInstanceName() {
 +    return instanceName;
 +  }
 +  
 +  @Override
 +  public String getZooKeepers() {
 +    return "localhost";
 +  }
 +  
 +  @Override
 +  public int getZooKeepersSessionTimeOut() {
 +    return 30 * 1000;
 +  }
 +  
 +  @Override
 +  @Deprecated
 +  public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(user, new PasswordToken(pass));
 +  }
 +  
 +  @Override
 +  @Deprecated
 +  public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(user, ByteBufferUtil.toBytes(pass));
 +  }
 +  
 +  @Override
 +  @Deprecated
 +  public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(user, TextUtil.getBytes(new Text(pass.toString())));
 +  }
 +  
 +  AccumuloConfiguration conf = null;
 +  
 +  @Override
 +  public AccumuloConfiguration getConfiguration() {
 +    if (conf == null)
 +      conf = AccumuloConfiguration.getDefaultConfiguration();
 +    return conf;
 +  }
 +  
 +  @Override
 +  public void setConfiguration(AccumuloConfiguration conf) {
 +    this.conf = conf;
 +  }
 +  
 +  @Deprecated
 +  @Override
 +  public Connector getConnector(org.apache.accumulo.core.security.thrift.AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
 +    return getConnector(auth.user, auth.password);
 +  }
 +  
 +  @Override
 +  public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
 +    Connector conn = new MockConnector(principal, acu, this);
 +    if (!acu.users.containsKey(principal))
 +      conn.securityOperations().createLocalUser(principal, (PasswordToken) token);
 +    else if (!acu.users.get(principal).token.equals(token))
 +      throw new AccumuloSecurityException(principal, SecurityErrorCode.BAD_CREDENTIALS);
 +    return conn;
 +  }
- 
-   @Override
-   public void close() {
-     // NOOP
-   }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f624d402/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
index 881cdfc,0000000..9bffc81
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
@@@ -1,227 -1,0 +1,223 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.util;
 +
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.util.HashMap;
 +import java.util.Map;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.impl.ClientExec;
 +import org.apache.accumulo.core.client.impl.ClientExecReturn;
 +import org.apache.accumulo.core.client.impl.ThriftTransportPool;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.instrument.Trace;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.TServiceClient;
 +import org.apache.thrift.TServiceClientFactory;
 +import org.apache.thrift.protocol.TCompactProtocol;
 +import org.apache.thrift.protocol.TMessage;
 +import org.apache.thrift.protocol.TProtocol;
 +import org.apache.thrift.protocol.TProtocolFactory;
 +import org.apache.thrift.transport.TFramedTransport;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +import org.apache.thrift.transport.TTransportFactory;
 +
 +
 +public class ThriftUtil {
 +  private static final Logger log = Logger.getLogger(ThriftUtil.class);
 +
 +  public static class TraceProtocol extends TCompactProtocol {
 +
 +    @Override
 +    public void writeMessageBegin(TMessage message) throws TException {
 +      Trace.start("client:" + message.name);
 +      super.writeMessageBegin(message);
 +    }
 +
 +    @Override
 +    public void writeMessageEnd() throws TException {
 +      super.writeMessageEnd();
 +      Span currentTrace = Trace.currentTrace();
 +      if (currentTrace != null)
 +        currentTrace.stop();
 +    }
 +
 +    public TraceProtocol(TTransport transport) {
 +      super(transport);
 +    }
 +  }
 +  
 +  public static class TraceProtocolFactory extends TCompactProtocol.Factory {
 +    private static final long serialVersionUID = 1L;
 +
 +    @Override
 +    public TProtocol getProtocol(TTransport trans) {
 +      return new TraceProtocol(trans);
 +    }
 +  }
 +  
 +  static private TProtocolFactory protocolFactory = new TraceProtocolFactory();
 +  static private TTransportFactory transportFactory = new TFramedTransport.Factory(Integer.MAX_VALUE);
 +  
 +  static public <T extends TServiceClient> T createClient(TServiceClientFactory<T> factory, TTransport transport) {
 +    return factory.getClient(protocolFactory.getProtocol(transport), protocolFactory.getProtocol(transport));
 +  }
 +  
 +  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, InetSocketAddress address, AccumuloConfiguration conf)
 +      throws TTransportException {
 +    return createClient(factory, ThriftTransportPool.getInstance().getTransportWithDefaultTimeout(address, conf));
 +  }
 +  
 +  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, Property property, AccumuloConfiguration configuration)
 +      throws TTransportException {
 +    int port = configuration.getPort(property);
 +    TTransport transport = ThriftTransportPool.getInstance().getTransport(address, port);
 +    return createClient(factory, transport);
 +  }
 +  
 +  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, Property property, Property timeoutProperty,
 +      AccumuloConfiguration configuration) throws TTransportException {
 +    return getClient(factory, address, property, configuration.getTimeInMillis(timeoutProperty), configuration);
 +  }
 +  
 +  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, Property property, long timeout,
 +      AccumuloConfiguration configuration) throws TTransportException {
 +    int port = configuration.getPort(property);
 +    TTransport transport = ThriftTransportPool.getInstance().getTransport(address, port, timeout);
 +    return createClient(factory, transport);
 +  }
 +  
 +  static public void returnClient(TServiceClient iface) { // Eew... the typing here is horrible
 +    if (iface != null) {
 +      ThriftTransportPool.getInstance().returnTransport(iface.getInputProtocol().getTransport());
 +    }
 +  }
 +  
 +  static public TabletClientService.Client getTServerClient(String address, AccumuloConfiguration conf) throws TTransportException {
 +    return getClient(new TabletClientService.Client.Factory(), address, Property.TSERV_CLIENTPORT, Property.GENERAL_RPC_TIMEOUT, conf);
 +  }
 +  
 +  static public TabletClientService.Client getTServerClient(String address, AccumuloConfiguration conf, long timeout) throws TTransportException {
 +    return getClient(new TabletClientService.Client.Factory(), address, Property.TSERV_CLIENTPORT, timeout, conf);
 +  }
 +
 +  public static void execute(String address, AccumuloConfiguration conf, ClientExec<TabletClientService.Client> exec) throws AccumuloException,
 +      AccumuloSecurityException {
 +    while (true) {
 +      TabletClientService.Client client = null;
 +      try {
 +        exec.execute(client = getTServerClient(address, conf));
 +        break;
 +      } catch (TTransportException tte) {
 +        log.debug("getTServerClient request failed, retrying ... ", tte);
 +        UtilWaitThread.sleep(100);
 +      } catch (ThriftSecurityException e) {
 +        throw new AccumuloSecurityException(e.user, e.code, e);
 +      } catch (Exception e) {
 +        throw new AccumuloException(e);
 +      } finally {
 +        if (client != null)
 +          returnClient(client);
 +      }
 +    }
 +  }
 +  
 +  public static <T> T execute(String address, AccumuloConfiguration conf, ClientExecReturn<T,TabletClientService.Client> exec) throws AccumuloException,
 +      AccumuloSecurityException {
 +    while (true) {
 +      TabletClientService.Client client = null;
 +      try {
 +        return exec.execute(client = getTServerClient(address, conf));
 +      } catch (TTransportException tte) {
 +        log.debug("getTServerClient request failed, retrying ... ", tte);
 +        UtilWaitThread.sleep(100);
 +      } catch (ThriftSecurityException e) {
 +        throw new AccumuloSecurityException(e.user, e.code, e);
 +      } catch (Exception e) {
 +        throw new AccumuloException(e);
 +      } finally {
 +        if (client != null)
 +          returnClient(client);
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * create a transport that is not pooled
 +   */
 +  public static TTransport createTransport(String address, int port, AccumuloConfiguration conf) throws TException {
 +    TTransport transport = null;
 +    
 +    try {
 +      transport = TTimeoutTransport.create(org.apache.accumulo.core.util.AddressUtil.parseAddress(address, port),
 +          conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +      transport = ThriftUtil.transportFactory().getTransport(transport);
 +      transport.open();
 +      TTransport tmp = transport;
 +      transport = null;
 +      return tmp;
 +    } catch (IOException ex) {
 +      throw new TTransportException(ex);
 +    } finally {
 +      if (transport != null)
 +        transport.close();
 +    }
 +    
 +
 +  }
 +
 +  /**
 +   * create a transport that is not pooled
 +   */
 +  public static TTransport createTransport(InetSocketAddress address, AccumuloConfiguration conf) throws TException {
 +    return createTransport(address.getAddress().getHostAddress(), address.getPort(), conf);
 +  }
 +
 +  public static TTransportFactory transportFactory() {
 +    return transportFactory;
 +  }
 +  
 +  private final static Map<Integer,TTransportFactory> factoryCache = new HashMap<Integer,TTransportFactory>();
 +  synchronized public static TTransportFactory transportFactory(int maxFrameSize) {
 +    TTransportFactory factory = factoryCache.get(maxFrameSize);
 +    if(factory == null)
 +    {
 +      factory = new TFramedTransport.Factory(maxFrameSize);
 +      factoryCache.put(maxFrameSize,factory);
 +    }
 +    return factory;
 +  }
 +
 +  synchronized public static TTransportFactory transportFactory(long maxFrameSize) {
 +    if(maxFrameSize > Integer.MAX_VALUE || maxFrameSize < 1)
 +      throw new RuntimeException("Thrift transport frames are limited to "+Integer.MAX_VALUE);
 +    return transportFactory((int)maxFrameSize);
 +  }
 +
 +  public static TProtocolFactory protocolFactory() {
 +    return protocolFactory;
 +  }
-   
-   public static void close() {
-     ThriftTransportPool.close();
-   }
 +}


Mime
View raw message