hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1576884 [1/2] - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift: ./ exceptions/
Date Wed, 12 Mar 2014 20:30:00 GMT
Author: liyin
Date: Wed Mar 12 20:30:00 2014
New Revision: 1576884

URL: http://svn.apache.org/r1576884
Log:
[HBASE-9930] Remove validateResult function which converts the SENTINEL_RESULT…
Summary: This diff removes the validateResult() function which converts the SENTINEL_RESULT back to null in the get call also. This is not supposed to be done because the SENTINEL_RESULT.isEmpty() is true which is in coherence with the semantics of get.

Test Plan: Run TestFromClientSide

Reviewers: fan, adela, gauravm, daviddeng

Reviewed By: gauravm

CC: hbase-eng@

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseThriftRPC.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThreadBasedThriftClientCache.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftCallStatsReporter.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCache.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheFactory.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientObjectFactory.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/exceptions/
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/exceptions/ThriftHBaseException.java

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseThriftRPC.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseThriftRPC.java?rev=1576884&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseThriftRPC.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseThriftRPC.java Wed Mar 12 20:30:00 2014
@@ -0,0 +1,222 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.thrift;
+
+import com.facebook.swift.service.ThriftClientManager;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
+import org.apache.hadoop.hbase.ipc.ThriftClientInterface;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Stack;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Simple RPC mechanism for making client calls. It contains caching of the
+ * client connection, such that we don't create connections multiple times for
+ * the same client.
+ *
+ */
+public class HBaseThriftRPC {
+  private static final Log LOG = LogFactory.getLog(HBaseThriftRPC.class);
+  private static final Map<Configuration, ThriftClientCache> CLIENT_CACHE =
+      new ConcurrentHashMap<>();
+  public static ThreadLocal<Stack<Boolean>> isMeta =
+      new ThreadLocal<Stack<Boolean>>() {
+    @Override
+    public Stack<Boolean> initialValue() {
+      return new Stack<>();
+    }
+  };
+
+  private static Configuration metaConf = null;
+
+  private static boolean USE_POOLING = true;
+  private static ConcurrentHashMap<InetSocketAddress, ThriftClientObjectFactory> factoryMap =
+      new ConcurrentHashMap<>();
+  private static ThriftClientManager sharedManager = new ThriftClientManager();
+
+  public static void clearAll() throws Exception {
+    metaConf = null;
+    isMeta.get().clear();
+    synchronized (CLIENT_CACHE) {
+      for (ThriftClientCache cache : CLIENT_CACHE.values()) {
+        cache.shutDownClientManager();
+      }
+      CLIENT_CACHE.clear();
+    }
+  }
+
+  /**
+   * USE WITH CAUTION
+   * Returns the ThriftClientInterface and THriftClientManager for the given parameters.
+   * Creates a new Thrift client connection using the given parameters.
+   * @param addr
+   * @param conf
+   * @param clazz
+   * @return
+   * @throws IOException
+   */
+  protected static Pair<ThriftClientInterface, ThriftClientManager> getClientWithoutWrapper(
+      InetSocketAddress addr,
+      Configuration conf,
+      Class<? extends ThriftClientInterface> clazz) throws IOException {
+    if (!USE_POOLING) {
+      ThriftClientObjectFactory factory = factoryMap.get(addr);
+      if (factory == null) {
+        ThriftClientManager manager = new ThriftClientManager();
+        factory = new ThriftClientObjectFactory(addr, clazz, manager, conf);
+        factoryMap.putIfAbsent(addr, factory);
+      }
+      try {
+        return new Pair<>(factory.makeObject(), sharedManager);
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
+    conf = checkIfInMeta(conf);
+    ThriftClientCache clientsForConf = CLIENT_CACHE.get(conf);
+    if (clientsForConf == null) {
+      synchronized (CLIENT_CACHE) {
+        clientsForConf = CLIENT_CACHE.get(conf);
+        if (clientsForConf == null) {
+          clientsForConf = (new ThriftClientCacheFactory(conf))
+              .createClientCacheWithPooling();
+          CLIENT_CACHE.put(conf, clientsForConf);
+        }
+      }
+    }
+    try {
+      ThriftClientInterface client = null;
+      try {
+        client = clientsForConf.getClient(addr, clazz);
+      } catch (Exception e) {
+        LOG.warn("Failed getting Client without Wrapper for address: " +
+          addr.toString(), e);
+        return null;
+      }
+      return new Pair<> (client, clientsForConf.getThriftClientManager());
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Use this instead of getClientWrapper.
+   * @param addr
+   * @param conf
+   * @param clazz
+   * @return
+   * @throws IOException
+   */
+  public static ThriftClientInterface getClient(InetSocketAddress addr,
+      Configuration conf, Class<? extends ThriftClientInterface> clazz,
+      HBaseRPCOptions options) throws IOException {
+    Pair<ThriftClientInterface, ThriftClientManager> interfaceAndManager = getClientWithoutWrapper(
+        addr, conf, clazz);
+    return new HBaseToThriftAdapter(interfaceAndManager.getFirst(),
+        interfaceAndManager.getSecond(), addr, conf, clazz, options);
+  }
+
+  /**
+   * Refreshes the connection i.e., closes and returns a new connection and the
+   * connection manager We use the auxiliary parameters in this function call to
+   * uniquely identify the given ThriftClientInterface.
+   *
+   * @param inetSocketAddress
+   * @param conf
+   * @param thriftClient
+   * @param clientInterface
+   * @return
+   * @throws IOException
+   */
+  protected static Pair<ThriftClientInterface, ThriftClientManager> refreshConnection(
+      InetSocketAddress inetSocketAddress,
+      Configuration conf, ThriftClientInterface thriftClient,
+      Class<? extends ThriftClientInterface> clientInterface) throws IOException {
+    if (!USE_POOLING) {
+      try {
+        thriftClient.close();
+      } catch (Exception e) {
+        LOG.warn("Unable to close client because: ", e);
+      }
+      return new Pair<>(null, null);
+    }
+    conf = checkIfInMeta(conf);
+    ThriftClientCache clientsForConf = CLIENT_CACHE.get(conf);
+    try {
+      if (clientsForConf == null) {
+        LOG.error("Client cache pool for current configuration is null.");
+        thriftClient.close();
+      } else {
+        clientsForConf.close(inetSocketAddress, clientInterface, thriftClient);
+      }
+      LOG.debug("Refreshing client connection due to an error in the channel. Remote address: " + inetSocketAddress);
+      return getClientWithoutWrapper(inetSocketAddress, conf, clientInterface);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  private static Configuration checkIfInMeta(Configuration inputConf) {
+    Configuration conf = inputConf;
+    if (!isMeta.get().isEmpty() && isMeta.get().peek()) {
+      if (metaConf == null) {
+        metaConf = (HBaseConfiguration.create(inputConf));
+      }
+      conf = metaConf;
+    }
+    return conf;
+  }
+
+  public static void putBackClient(ThriftClientInterface server,
+      InetSocketAddress addr, Configuration conf,
+      Class<? extends ThriftClientInterface> clazz) throws Exception {
+    if (!USE_POOLING) {
+      try {
+        // FIXME
+        // When using async, without connection pooling, because as per @fan,
+        // we would just close the connection before the async call finishes.
+        server.close();
+      } catch (Exception e) {
+        LOG.warn("Could not close connection to " + addr.toString(), e);
+      }
+      return;
+    }
+
+    conf = checkIfInMeta(conf);
+    ThriftClientCache clientsForConf = CLIENT_CACHE.get(conf);
+    if (clientsForConf == null) {
+      return;
+    }
+    clientsForConf.putBackClient(server, addr, clazz);
+  }
+
+  public static void setUsePooling(boolean b) {
+    USE_POOLING = b;
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java?rev=1576884&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java Wed Mar 12 20:30:00 2014
@@ -0,0 +1,1171 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.thrift;
+
+import com.facebook.nifty.header.transport.THeaderTransport;
+import com.facebook.swift.service.RuntimeTApplicationException;
+import com.facebook.swift.service.ThriftClientManager;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.MultiAction;
+import org.apache.hadoop.hbase.client.MultiPut;
+import org.apache.hadoop.hbase.client.MultiPutResponse;
+import org.apache.hadoop.hbase.client.MultiResponse;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TRowMutations;
+import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
+import org.apache.hadoop.hbase.ipc.HBaseServer.Call;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.ThriftClientInterface;
+import org.apache.hadoop.hbase.ipc.ThriftHRegionInterface;
+import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
+import org.apache.hadoop.hbase.master.AssignmentPlan;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC.VersionIncompatible;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * The client should use this class to communicate to the server via thrift
+ *
+ */
+public class HBaseToThriftAdapter implements HRegionInterface {
+  public static final Log LOG = LogFactory.getLog(HBaseToThriftAdapter.class);
+  private ThriftHRegionInterface connection;
+  private ThriftClientManager clientManager;
+  private InetSocketAddress addr;
+  private Configuration conf;
+  private Class<? extends ThriftClientInterface> clazz;
+  private HBaseRPCOptions options;
+  private boolean useHeaderProtocol;
+
+  public HBaseToThriftAdapter(ThriftClientInterface connection,
+      ThriftClientManager clientManager,
+      InetSocketAddress addr, Configuration conf,
+      Class<? extends ThriftClientInterface> clazz,
+      HBaseRPCOptions options) {
+    this.connection = (ThriftHRegionInterface)connection;
+    this.clientManager = clientManager;
+    this.addr = addr;
+    this.conf = conf;
+    this.clazz = clazz;
+    this.options = options;
+    this.useHeaderProtocol = conf.getBoolean(HConstants.USE_HEADER_PROTOCOL,
+      HConstants.DEFAULT_USE_HEADER_PROTOCOL);
+  }
+
+  /**
+   * Call this function as demonstrated in
+   * {@link HBaseToThriftAdapter#getRegionInfo(byte[])}. This function when
+   * called with the captured exception, will perform the appropriate action
+   * depending upon what exception is thrown. Any unknown runtime exception
+   * thrown by the swift or underlying libraries suggest that we refresh the
+   * connection and re-throw the exception.
+   *
+   * @param e : The captured exception.
+   */
+  public void refreshConnectionAndThrowIOException(Exception e)
+      throws IOException {
+    if (e instanceof TApplicationException) {
+      throw new RuntimeException(e);
+    } else if (e instanceof RuntimeTApplicationException) {
+      throw new RuntimeException(e);
+    } else {
+      Pair<ThriftClientInterface, ThriftClientManager> interfaceAndManager = HBaseThriftRPC
+          .refreshConnection(this.addr, this.conf, this.connection, this.clazz);
+      this.connection = (ThriftHRegionInterface) interfaceAndManager.getFirst();
+      this.clientManager = interfaceAndManager.getSecond();
+      throw new IOException(e);
+    }
+  }
+
+  private void refreshConnectionAndThrowRuntimeException(
+      Exception e) {
+    try {
+      refreshConnectionAndThrowIOException(e);
+    } catch (IOException e1) {
+      throw new RuntimeException(e1);
+    }
+  }
+
+  /**
+   * Send some data about the call from the client side to the server side
+   *
+   * @throws Exception
+   */
+  public void setHeader() throws Exception {
+    TProtocol outputProtocol = clientManager.getOutputProtocol(connection);
+    TTransport outputTransport = outputProtocol.getTransport();
+    if (outputTransport instanceof THeaderTransport) {
+      THeaderTransport headerTransport = (THeaderTransport) outputTransport;
+      if (options != null && options.getRequestProfiling()) {
+        Call call = new Call(options);
+        String stringData = Bytes
+            .writeThriftBytesAndGetString(call, Call.class);
+        headerTransport.setHeader(HConstants.THRIFT_HEADER_FROM_CLIENT, stringData);
+      }
+    } else {
+      LOG.error("output transport for client was not THeaderTransport, client cannot send headers");
+    }
+  }
+
+  private void preProcess() {
+    if (this.connection == null || this.clientManager == null) {
+      try {
+        Pair<ThriftClientInterface, ThriftClientManager> clientAndManager = HBaseThriftRPC
+            .getClientWithoutWrapper(addr, conf, clazz);
+        this.connection = (ThriftHRegionInterface) clientAndManager.getFirst();
+        this.clientManager = clientAndManager.getSecond();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    if (useHeaderProtocol) {
+      try {
+        setHeader();
+      } catch (Exception e) {
+        e.printStackTrace();
+        LOG.error("Header could not be sent");
+      }
+    }
+  }
+
+  /**
+   * Read data that the server has sent to the client
+   * TODO: test how it works with async calls
+   */
+  public void readHeader() {
+    if (clientManager == null) {
+      return;
+    }
+    TTransport inputTransport = clientManager.getInputProtocol(connection)
+        .getTransport();
+    TTransport outputTransport = clientManager.getOutputProtocol(connection)
+        .getTransport();
+    if (inputTransport instanceof THeaderTransport) {
+      THeaderTransport headerTransport = (THeaderTransport) outputTransport;
+      String dataString = headerTransport.getReadHeaders().get(HConstants.THRIFT_HEADER_FROM_SERVER);
+      if (dataString != null) {
+        byte[] dataBytes = Bytes.hexToBytes(dataString);
+        try {
+          Call call = Bytes.readThriftBytes(dataBytes, Call.class);
+          this.options.profilingResult = call.getProfilingData();
+        } catch (Exception e) {
+          LOG.error("data deserialization didn't succeed", e);
+        }
+      }
+    } else {
+      LOG.error("input transport was not THeaderTransport, client cannot read headers");
+    }
+  }
+
+  private void postProcess() {
+    try {
+      if (this.useHeaderProtocol) {
+        readHeader();
+      }
+      HBaseThriftRPC.putBackClient(this.connection, this.addr, this.conf,
+        this.clazz);
+      this.connection = null;
+      this.clientManager = null;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void handleIOException(Exception e) throws IOException {
+    if (e instanceof IOException) {
+      throw (IOException) e;
+    }
+  }
+
+  private void handleNotServingRegionException(Exception e)
+    throws NotServingRegionException {
+    if (e instanceof NotServingRegionException) {
+      throw (NotServingRegionException) e;
+    }
+  }
+
+  private void handleIllegalArgumentException(Exception e)
+    throws IllegalArgumentException {
+    if (e instanceof IllegalArgumentException) {
+      throw (IllegalArgumentException) e;
+    }
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2)
+      throws IOException {
+    throw new UnsupportedOperationException("Method is not supported for thrift");
+  }
+
+  @Override
+  public long getProtocolVersion(String arg0, long arg1)
+      throws VersionIncompatible, IOException {
+    throw new UnsupportedOperationException("Method is not supported for thrift");
+  }
+
+  @Override
+  public void stopForRestart() {
+    preProcess();
+    try {
+      connection.stopForRestart();
+    } catch (Exception e) {
+      refreshConnectionAndThrowRuntimeException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public boolean isStopped() {
+    preProcess();
+    try {
+      return connection.isStopped();
+    } catch (Exception e) {
+      refreshConnectionAndThrowRuntimeException(e);
+      return false;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    throw new UnsupportedOperationException("Method is not supported for thrift");
+  }
+
+  @Override
+  public HRegionInfo getRegionInfo(byte[] regionName)
+      throws NotServingRegionException {
+    preProcess();
+    try {
+      return connection.getRegionInfo(regionName);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleNotServingRegionException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowRuntimeException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public Result getClosestRowBefore(byte[] regionName, byte[] row, byte[] family)
+      throws IOException {
+    preProcess();
+    try {
+      Result r = connection.getClosestRowBefore(regionName, row, family);
+      if (r.isSentinelResult()) {
+        return null;
+      } else {
+        return r;
+      }
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
+  public ListenableFuture<Result> getClosestRowBeforeAsync(byte[] regionName, byte[] row, byte[] family) {
+    preProcess();
+    try {
+      return connection.getClosestRowBeforeAsync(regionName, row, family);
+    } finally {
+      postProcess();
+    }
+  }
+
+  // TODO: we will decide whether to remove it from HRegionInterface in the future
+  @Override
+  public HRegion[] getOnlineRegionsAsArray() {
+    LOG.error("Intentionally not implemented because it's only called internally.");
+    return null;
+  }
+
+  @Override
+  public void flushRegion(byte[] regionName) throws IllegalArgumentException,
+      IOException {
+    preProcess();
+    try {
+      connection.flushRegion(regionName);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIllegalArgumentException(e);
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public void flushRegion(byte[] regionName, long ifOlderThanTS)
+      throws IllegalArgumentException, IOException {
+    preProcess();
+    try {
+      connection.flushRegion(regionName, ifOlderThanTS);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIllegalArgumentException(e);
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public long getLastFlushTime(byte[] regionName) {
+    preProcess();
+    try {
+      return connection.getLastFlushTime(regionName);
+    } catch (Exception e) {
+      refreshConnectionAndThrowRuntimeException(e);
+      return -1;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public MapWritable getLastFlushTimes() {
+    preProcess();
+    try {
+      Map<byte[], Long> map = connection.getLastFlushTimes();
+      MapWritable writableMap = new MapWritable();
+      for (Entry<byte[], Long> e : map.entrySet()) {
+        writableMap.put(new BytesWritable(e.getKey()),
+            new LongWritable(e.getValue()));
+      }
+      return writableMap;
+    } catch (Exception e) {
+      refreshConnectionAndThrowRuntimeException(e);
+      throw new RuntimeException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public long getCurrentTimeMillis() {
+    preProcess();
+    try {
+      return connection.getCurrentTimeMillis();
+    } catch (Exception e) {
+      refreshConnectionAndThrowRuntimeException(e);
+      return -1;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public long getStartCode() {
+    preProcess();
+    try {
+      return connection.getStartCode();
+    } catch (Exception e) {
+      refreshConnectionAndThrowRuntimeException(e);
+      return -1;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public List<String> getStoreFileList(byte[] regionName, byte[] columnFamily)
+      throws IllegalArgumentException {
+    preProcess();
+    try {
+      return connection.getStoreFileList(regionName, columnFamily);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIllegalArgumentException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowRuntimeException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public List<String> getStoreFileList(byte[] regionName,
+      byte[][] columnFamilies) throws IllegalArgumentException {
+    preProcess();
+    try {
+      List<byte[]> columnFamiliesList = new ArrayList<>();
+      Collections.addAll(columnFamiliesList, columnFamilies);
+      return connection.getStoreFileListForColumnFamilies(regionName,
+          columnFamiliesList);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIllegalArgumentException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowRuntimeException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public List<String> getStoreFileList(byte[] regionName)
+      throws IllegalArgumentException {
+    preProcess();
+    try {
+      return connection.getStoreFileListForAllColumnFamilies(regionName);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIllegalArgumentException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowRuntimeException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public List<String> getHLogsList(boolean rollCurrentHLog) throws IOException {
+    preProcess();
+    try {
+      return connection.getHLogsList(rollCurrentHLog);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public Result get(byte[] regionName, Get get) throws IOException {
+    preProcess();
+    try {
+      return connection.get(regionName, get);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public Result[] get(byte[] regionName, List<Get> gets) throws IOException {
+    preProcess();
+    try {
+      List<Result> listOfResults = connection.getRows(regionName, gets);
+      return listOfResults.toArray(new Result[listOfResults.size()]);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
+  public ListenableFuture<Result> getAsync(byte[] regionName, Get get) {
+    preProcess();
+    try {
+      return connection.getAsync(regionName, get);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public boolean exists(byte[] regionName, Get get) throws IOException {
+    preProcess();
+    try {
+      return connection.exists(regionName, get);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      return false;
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return false;
+    }
+  }
+
+  @Override
+  public void put(byte[] regionName, Put put) throws IOException {
+    preProcess();
+    try {
+      connection.put(regionName, put);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public int put(byte[] regionName, List<Put> puts) throws IOException {
+    preProcess();
+    try {
+      return connection.putRows(regionName, puts);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      return -1;
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return -1;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public void delete(byte[] regionName, Delete delete) throws IOException {
+    preProcess();
+    try {
+      connection.processDelete(regionName, delete);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  public ListenableFuture<Void> deleteAsync(final byte[] regionName, final Delete delete) {
+    preProcess();
+    try {
+      return connection.deleteAsync(regionName, delete);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public int delete(byte[] regionName, List<Delete> deletes)
+      throws IOException {
+    preProcess();
+    try {
+      return connection.processListOfDeletes(regionName, deletes);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      return -1;
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return -1;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public boolean checkAndPut(byte[] regionName, byte[] row, byte[] family,
+      byte[] qualifier, byte[] value, Put put) throws IOException {
+    preProcess();
+    try {
+      return connection.checkAndPut(regionName, row, family, qualifier, value, put);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      return false;
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return false;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public boolean checkAndDelete(byte[] regionName, byte[] row, byte[] family,
+      byte[] qualifier, byte[] value, Delete delete) throws IOException {
+    preProcess();
+    try {
+      return connection.checkAndDelete(regionName, row, family, qualifier, value, delete);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      return false;
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return false;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public long incrementColumnValue(byte[] regionName, byte[] row,
+      byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
+      throws IOException {
+    preProcess();
+    try {
+      return connection.incrementColumnValue(regionName, row, family, qualifier, amount, writeToWAL);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      return -1;
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return -1;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public long openScanner(byte[] regionName, Scan scan) throws IOException {
+    preProcess();
+    try {
+      return connection.openScanner(regionName, scan);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      return -1;
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return -1;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public void mutateRow(byte[] regionName, RowMutations arm)
+      throws IOException {
+    preProcess();
+    try {
+      connection.mutateRow(regionName, TRowMutations.Builder.createFromRowMutations(arm));
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  public ListenableFuture<Void> mutateRowAsync(byte[] regionName, RowMutations arm) {
+    preProcess();
+    try {
+      return connection.mutateRowAsync(regionName, TRowMutations.Builder.createFromRowMutations(arm));
+    } catch (IOException e) {
+      return Futures.immediateFailedFuture(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public void mutateRow(byte[] regionName, List<RowMutations> armList)
+      throws IOException {
+    preProcess();
+    try {
+      List<TRowMutations> listOfMutations = new ArrayList<>();
+      for (RowMutations mutation : armList) {
+        listOfMutations.add(TRowMutations.Builder.createFromRowMutations(mutation));
+      }
+      connection.mutateRows(regionName, listOfMutations);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  @Deprecated
+  public Result next(long scannerId) throws IOException {
+    preProcess();
+    try {
+      return connection.next(scannerId);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
+  /**
+   * Maps the new terminal condition of the Result stream back to the original
+   * format.
+   * {@link Result#SENTINEL_RESULT_ARRAY} as the return value to
+   * {@link this#next(long, int)} represents the terminal condition to the end
+   * of scan.
+   * Previously, null used to represent the same.
+   * This method maps the new format to the old format to maintain backward
+   * compatibility.
+   * @param res
+   * @return
+   */
+  private Result[] validateResults(Result[] res) {
+    if (res == null) return null;
+    if (res.length == 1) {
+      return res[0].isSentinelResult() ? null : res;
+    }
+    return res;
+  }
+
+  @Override
+  public Result[] next(long scannerId, int numberOfRows) throws IOException {
+    preProcess();
+    try {
+      List<Result> resultList = connection.nextRows(scannerId, numberOfRows);
+      Result[] ret = resultList.toArray(new Result[resultList.size()]);
+      return validateResults(ret);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public void close(long scannerId) throws IOException {
+    preProcess();
+    try {
+      connection.close(scannerId);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public long lockRow(byte[] regionName, byte[] row) throws IOException {
+    preProcess();
+    try {
+      return connection.lockRow(regionName, row);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      return -1;
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return -1;
+    } finally {
+      postProcess();
+    }
+  }
+
+  public ListenableFuture<RowLock> lockRowAsync(byte[] regionName, byte[] row) {
+    preProcess();
+    try {
+      return connection.lockRowAsync(regionName, row);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public void unlockRow(byte[] regionName, long lockId) throws IOException {
+    preProcess();
+    try {
+      connection.unlockRow(regionName, lockId);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      return;
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  public ListenableFuture<Void> unlockRowAsync(byte[] regionName, long lockId) {
+    preProcess();
+    try {
+      return connection.unlockRowAsync(regionName, lockId);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public HRegionInfo[] getRegionsAssignment() throws IOException {
+    preProcess();
+    try {
+      List<HRegionInfo> hRegionInfos = connection.getRegionsAssignment();
+      return hRegionInfos.toArray(new HRegionInfo[hRegionInfos.size()]);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public HServerInfo getHServerInfo() throws IOException {
+    preProcess();
+    try {
+      return connection.getHServerInfo();
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public MultiResponse multiAction(MultiAction multi) throws IOException {
+    preProcess();
+    try {
+      return MultiResponse.Builder.createFromTMultiResponse(connection
+          .multiAction(multi));
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public MultiPutResponse multiPut(MultiPut puts) throws IOException {
+    preProcess();
+    try {
+      return connection.multiPut(puts);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public void bulkLoadHFile(String hfilePath, byte[] regionName,
+      byte[] familyName) throws IOException {
+    preProcess();
+    try {
+      connection.bulkLoadHFile(hfilePath, regionName, familyName);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public void bulkLoadHFile(String hfilePath, byte[] regionName,
+      byte[] familyName, boolean assignSeqNum) throws IOException {
+    preProcess();
+    try {
+      connection.bulkLoadHFile(hfilePath, regionName, familyName, assignSeqNum);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public void closeRegion(HRegionInfo hri, boolean reportWhenCompleted)
+      throws IOException {
+    preProcess();
+    try {
+      connection.closeRegion(hri, reportWhenCompleted);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public int updateFavoredNodes(AssignmentPlan plan) throws IOException {
+    preProcess();
+    try {
+      return connection.updateFavoredNodes(plan);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      return -1;
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return -1;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public void updateConfiguration() {
+    preProcess();
+    try {
+      connection.updateConfiguration();
+    } catch (Exception e) {
+      refreshConnectionAndThrowRuntimeException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public void stop(String why) {
+    preProcess();
+    try {
+      connection.stop(why);
+    } catch (Exception e) {
+      refreshConnectionAndThrowRuntimeException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public String getStopReason() {
+    preProcess();
+    try {
+      return connection.getStopReason();
+    } catch (Exception e) {
+      refreshConnectionAndThrowRuntimeException(e);
+      throw new RuntimeException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public void setNumHDFSQuorumReadThreads(int maxThreads) {
+    preProcess();
+    try {
+      connection.setNumHDFSQuorumReadThreads(maxThreads);
+    } catch (Exception e) {
+      refreshConnectionAndThrowRuntimeException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public void setHDFSQuorumReadTimeoutMillis(long timeoutMillis) {
+    preProcess();
+    try {
+      connection.setHDFSQuorumReadTimeoutMillis(timeoutMillis);
+    } catch (Exception e) {
+      refreshConnectionAndThrowRuntimeException(e);
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public String getConfProperty(String paramName) throws IOException {
+    preProcess();
+    try {
+      return connection.getConfProperty(paramName);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      return null;
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public List<Bucket> getHistogram(byte[] regionName) throws IOException {
+    preProcess();
+    try {
+       List<Bucket> buckets = connection.getHistogram(regionName);
+       if (buckets.size() == 0) return null;
+       return buckets;
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      return null;
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
+  @Override
+  public List<Bucket> getHistogramForStore(byte[] regionName, byte[] family)
+      throws IOException {
+    preProcess();
+    try {
+      List<Bucket> buckets =
+          connection.getHistogramForStore(regionName, family);
+      if (buckets.size() == 0) return null;
+      return buckets;
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      return null;
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThreadBasedThriftClientCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThreadBasedThriftClientCache.java?rev=1576884&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThreadBasedThriftClientCache.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThreadBasedThriftClientCache.java Wed Mar 12 20:30:00 2014
@@ -0,0 +1,148 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.thrift;
+
+import com.facebook.nifty.client.FramedClientConnector;
+import com.facebook.nifty.header.client.HeaderClientConnector;
+import com.facebook.swift.service.ThriftClientManager;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ipc.ThriftClientInterface;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * ThriftClient cache which keeps a client cache per thread in order to avoid
+ * reusing the client in parallel threads at the same time.
+ */
+public class ThreadBasedThriftClientCache implements ThriftClientCache {
+  private final static Log LOG =
+      LogFactory.getLog(ThreadBasedThriftClientCache.class);
+
+  private static final ThreadLocal<HashMap<Pair<InetSocketAddress,
+    Class<? extends ThriftClientInterface>>, ThriftClientInterface>> clients
+    = new ThreadLocal<HashMap<Pair<InetSocketAddress,
+        Class<? extends ThriftClientInterface>>, ThriftClientInterface>>() {
+      @Override
+      public HashMap<Pair<InetSocketAddress,
+        Class<? extends ThriftClientInterface>>, ThriftClientInterface>
+      initialValue() {
+        return new HashMap<Pair<InetSocketAddress,
+        Class<? extends ThriftClientInterface>>, ThriftClientInterface>();
+      }
+    };
+
+  private final ThriftClientManager clientManager;
+
+  private boolean useHeaderProtocol;
+
+  public ThreadBasedThriftClientCache(Configuration conf) {
+    clientManager = new ThriftClientManager();
+    useHeaderProtocol = conf.getBoolean(HConstants.USE_HEADER_PROTOCOL,
+      HConstants.DEFAULT_USE_HEADER_PROTOCOL);
+  }
+
+  /**
+   * It has a simple caching, such that when it is called for a same client
+   * multiple times - the cached value will be returned. When called first time
+   * the connection will be created
+   *
+   * @param address - the inet address
+   * @param clazz - type of client
+   * @return
+   * @throws InterruptedException
+   * @throws ExecutionException
+   */
+  public ThriftClientInterface getClient(
+      InetSocketAddress address, Class<? extends ThriftClientInterface> clazz)
+      throws InterruptedException, ExecutionException {
+    Pair<InetSocketAddress, Class<? extends ThriftClientInterface>> key = new Pair<InetSocketAddress, Class<? extends ThriftClientInterface>>(
+        address, clazz);
+    ThriftClientInterface client = clients.get().get(key);
+    if (client != null) {
+      return client;
+    }
+    synchronized (clientManager) {
+      ThriftClientInterface newClient;
+      if (useHeaderProtocol) {
+        newClient = clientManager.createClient(
+          new HeaderClientConnector(address), clazz).get();
+      } else {
+        newClient = clientManager.createClient(
+          new FramedClientConnector(address), clazz).get();
+      }
+      clients.get().put(key, newClient);
+      return newClient;
+    }
+  }
+
+  /**
+   * Closes the client manager and clears the map of cached clients
+   * @throws Exception
+   */
+  public synchronized void shutDownClientManager() throws Exception {
+    for (ThriftClientInterface client : clients.get().values()) {
+      client.close();
+    }
+    clients.get().clear();
+    this.clientManager.close();
+  }
+
+  /**
+   * Closes a connection of the specified client
+   * @param address
+   * @throws Exception
+   */
+  @Override
+  public synchronized void close(InetSocketAddress address,
+      Class<? extends ThriftClientInterface> clazz,
+      ThriftClientInterface client) throws IOException {
+    Pair<InetSocketAddress, Class<? extends ThriftClientInterface>> key =
+        new Pair<InetSocketAddress, Class<? extends ThriftClientInterface>>(
+        address, clazz);
+    ThriftClientInterface cachedClient = clients.get().remove(key);
+    try {
+      if (cachedClient != null) {
+        cachedClient.close();
+      } else {
+        client.close();
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void putBackClient(ThriftClientInterface server,
+      InetSocketAddress addr, Class<? extends ThriftClientInterface> clazz) {
+    // Do nothing for ThreadBasedThriftClientCache.
+  }
+
+  @Override
+  public ThriftClientManager getThriftClientManager() {
+    return this.clientManager;
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftCallStatsReporter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftCallStatsReporter.java?rev=1576884&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftCallStatsReporter.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftCallStatsReporter.java Wed Mar 12 20:30:00 2014
@@ -0,0 +1,477 @@
+/**
+ * Copyright 2013 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc.thrift;
+
+import com.facebook.nifty.core.NiftyRequestContext;
+import com.facebook.nifty.core.RequestContext;
+import com.facebook.nifty.header.transport.THeaderTransport;
+import com.facebook.swift.service.ThriftEventHandler;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Operation;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
+import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
+import org.apache.hadoop.hbase.ipc.HBaseServer.Call;
+import org.apache.hadoop.hbase.ipc.ProfilingData;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.ThriftHRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.thrift.transport.TTransport;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This is the handler class which has callbacks for different stages of the
+ * RPC call processing, pre-read, post-read, pre-write, post-Write etc.
+ *
+ * We will use it to calculate the processing time of calls.
+ *
+ * This class is similar to the ThriftServiceStatsHandler class, however, it
+ * maintains an internal storage of aggregated stats, and we cannot generate
+ * warnings based on the processing time, response size etc. because the
+ * PerCallMethodStats class is private.
+ */
+public class ThriftCallStatsReporter extends ThriftEventHandler {
+  static final Log LOG = LogFactory.getLog(ThriftCallStatsReporter.class);
+
+  HBaseRpcMetrics rpcMetrics;
+
+  private static final String WARN_RESPONSE_TIME =
+    "hbase.ipc.warn.response.time";
+  private static final String WARN_RESPONSE_SIZE =
+    "hbase.ipc.warn.response.size";
+
+  /** Default value for above params */
+  private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
+  private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
+
+  private final int warnResponseTime;
+  private final int warnResponseSize;
+
+  Object serverInstance;
+
+  private boolean useHeaderProtocol;
+
+  public ThriftCallStatsReporter(Configuration conf,
+                                 HBaseRpcMetrics rpcMetrics,
+                                 Object serverInstance) {
+    this.rpcMetrics = rpcMetrics;
+    this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME,
+      DEFAULT_WARN_RESPONSE_TIME);
+    this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
+      DEFAULT_WARN_RESPONSE_SIZE);
+    this.serverInstance = serverInstance;
+    this.useHeaderProtocol = conf.getBoolean(HConstants.USE_HEADER_PROTOCOL,
+      HConstants.DEFAULT_USE_HEADER_PROTOCOL);
+  }
+
+  /**
+   * This class keeps track of the RPC call's stats, i.e. the name of the
+   * method, when we started processing it, when we were done, etc.
+   */
+  public static class ThriftCallStats {
+    private RequestContext requestContext;
+    private String methodName;
+    private long preReadTimeMs;
+    private long postWriteTimeMs;
+    private Object[] callArgs;
+    private long requestSize;
+    private long responseSize;
+    private boolean success = true;
+
+    ThriftCallStats(String methodName, RequestContext requestContext) {
+      // Swift encodes the method name as <ClassName/>.<MethodName/>
+      // However, we only want the method name.
+      String[] splits = methodName.split("\\.");
+      this.methodName = splits[splits.length - 1];
+      this.requestContext = requestContext;
+    }
+
+    public String getMethodName() {
+      return this.methodName;
+    }
+
+    public RequestContext getRequestContext() {
+      return this.requestContext;
+    }
+
+    public void setPreReadTimeMs(long preReadTimeMs) {
+      this.preReadTimeMs = preReadTimeMs;
+    }
+
+    public long getPreReadTimeMs() {
+      return this.preReadTimeMs;
+    }
+
+    public void setPostWriteTimeMs(long postWriteTimeMs) {
+      this.postWriteTimeMs = postWriteTimeMs;
+    }
+
+    public long getPostWriteTimeMs() {
+      return this.postWriteTimeMs;
+    }
+
+    public void setCallArgs(Object[] callArgs) {
+      this.callArgs = callArgs;
+    }
+
+    public Object[] getCallArgs() {
+      return this.callArgs;
+    }
+
+    public long getRequestSize() {
+      return this.requestSize;
+    }
+
+    public long getResponseSize() {
+      return this.responseSize;
+    }
+
+    public long getProcessingTimeMs() {
+      return Math.max(this.postWriteTimeMs - this.preReadTimeMs, 0);
+    }
+
+    public long getStartTimeMs() {
+      return getPreReadTimeMs();
+    }
+
+    // TODO @gauravm
+    public long getQueueTimeMs() {
+      return 0;
+    }
+
+    public String getClientAddress() {
+      return this.requestContext.getRemoteAddress().toString();
+    }
+
+    public void setSuccess(boolean success) {
+      this.success = success;
+    }
+
+    public boolean getSuccess() {
+      return this.success;
+    }
+
+    public void setRequestSize(int requestSize) {
+      this.requestSize = requestSize;
+    }
+
+    public void setResponseSize(int responseSize) {
+      this.responseSize = responseSize;
+    }
+
+    @Override
+    public String toString() {
+      return "ThriftCallStats [requestContext=" + requestContext
+          + ", methodName=" + methodName + ", preReadTimeMs=" + preReadTimeMs
+          + ", postWriteTimeMs=" + postWriteTimeMs + ", callArgs="
+          + Arrays.toString(callArgs) + ", requestSize=" + requestSize
+          + ", responseSize=" + responseSize + ", success=" + success + "]";
+    }
+  }
+
+  /**
+   * This method creates a new ThriftCallStats object for every RPC call.
+   * @param methodName
+   * @param requestContext
+   * @return
+   */
+  @Override
+  public Object getContext(String methodName, RequestContext requestContext) {
+    return new ThriftCallStats(methodName, requestContext);
+  }
+
+  /**
+   *
+   * @param context
+   * @param methodName
+   */
+  @Override
+  public void preRead(Object context, String methodName) {
+    ThriftCallStats ctx = (ThriftCallStats) context;
+    ctx.setPreReadTimeMs(EnvironmentEdgeManager.currentTimeMillis());
+  }
+
+  /**
+   * Extract info from header
+   */
+  @Override
+  public void postRead(Object context, String methodName, Object[] args) {
+    ThriftCallStats ctx = (ThriftCallStats) context;
+    ctx.setCallArgs(args);
+    ctx.setRequestSize(getBytesRead(ctx));
+    if (useHeaderProtocol) {
+      if (ctx.requestContext instanceof NiftyRequestContext) {
+        Call call = getCallInfoFromClient(ctx.requestContext);
+        if (HRegionServer.enableServerSideProfilingForAllCalls.get()
+            || (call != null && call.isShouldProfile())) {
+          // it is possible that call is null - if profiling is enabled only on
+          // serverside
+          if (call == null) {
+            call = new Call(HBaseRPCOptions.DEFAULT);
+          }
+          call.setShouldProfile(true);
+          call.setProfilingData(new ProfilingData());
+        } else if (call != null) {
+          // call is not null but profiling is not enabled, so set profiling
+          // data to null
+          call.setProfilingData(null);
+        }
+        HRegionServer.callContext.set(call);
+      } else {
+        LOG.error("Request Context was not THeaderTransport, server cannot read headers");
+      }
+    }
+  }
+
+  /**
+   * Send header info to the client (used for profiling data) TODO: (adela)
+   * Check if the header needs to be sent every time or only when profiling is
+   * required
+   *
+   * @param context
+   * @param methodName
+   * @param result
+   */
+  @Override
+  public void preWrite(Object context, String methodName, Object result) {
+    ThriftCallStats ctx = (ThriftCallStats) context;
+    ctx.setPostWriteTimeMs(System.currentTimeMillis());
+    ctx.setResponseSize(getBytesWritten(ctx));
+    if (useHeaderProtocol) {
+      try {
+        Call call = HRegionServer.callContext.get();
+        HRegionServer.callContext.remove();
+        if (call!= null && call.isShouldProfile()) {
+          sendCallInfoToClient(call, ctx.requestContext);
+        }
+      } catch (Exception e) {
+        e.printStackTrace();
+        LOG.error("Exception during sending header happened");
+      }
+    }
+  }
+
+  /**
+   *
+   * @param context
+   * @param methodName
+   * @param result
+   */
+  @Override
+  public void postWrite(Object context, String methodName, Object result) {
+    ThriftCallStats ctx = (ThriftCallStats) context;
+    ctx.setPostWriteTimeMs(System.currentTimeMillis());
+    ctx.setResponseSize(getBytesWritten(ctx));
+  }
+
+  private int getBytesRead(ThriftCallStats ctx) {
+    if (!(ctx.requestContext instanceof NiftyRequestContext)) {
+      return 0;
+    }
+    NiftyRequestContext requestContext = (NiftyRequestContext) ctx.requestContext;
+    return requestContext.getNiftyTransport().getReadByteCount();
+  }
+
+  public void preReadException(Object context, String methodName, Throwable t) {
+    ThriftCallStats ctx = (ThriftCallStats) context;
+    ctx.setSuccess(false);
+    LOG.error("RPC call " + methodName + " failed before read.");
+    t.printStackTrace();
+  }
+
+  public void postReadException(Object context, String methodName, Throwable t) {
+    ThriftCallStats ctx = (ThriftCallStats) context;
+    ctx.setSuccess(false);
+    LOG.error("RPC call " + methodName + " failed after read.");
+    t.printStackTrace();
+  }
+
+  public void done(Object context, String methodName) {
+    ThriftCallStats ctx = (ThriftCallStats) context;
+    if (ctx.getSuccess()) {
+      int processingTime = (int) (ctx.getProcessingTimeMs());
+      rpcMetrics.rpcProcessingTime.inc(processingTime);
+      rpcMetrics.inc(ctx.getMethodName(), processingTime);
+
+      long responseSize = ctx.getResponseSize();
+      boolean tooSlow = (processingTime > warnResponseTime
+        && warnResponseTime > -1);
+      boolean tooLarge = (responseSize > warnResponseSize
+        && warnResponseSize > -1);
+      if (tooSlow || tooLarge) {
+        try {
+          logResponse(ctx, (tooLarge ? "TooLarge" : "TooSlow"));
+        } catch (IOException e) {
+          LOG.info("(operation" + (tooLarge ? "TooLarge" : "TooSlow") + "): " +
+                   ctx.getMethodName() + ". Error while logging: " + e);
+        }
+        if (tooSlow) {
+          // increment global slow RPC response counter
+          rpcMetrics.inc("slowResponse.", processingTime);
+          HRegion.incrNumericPersistentMetric("slowResponse.all.cumulative", 1);
+          HRegion.incrNumericPersistentMetric("slowResponse."
+            + ctx.getMethodName() + ".cumulative", 1);
+        }
+        if (tooLarge) {
+          // increment global slow RPC response counter
+          rpcMetrics.inc("largeResponse.kb.", (int)(responseSize/1024));
+          HRegion.incrNumericPersistentMetric("largeResponse.all.cumulative", 1);
+          HRegion.incrNumericPersistentMetric("largeResponse."
+            + ctx.getMethodName() + ".cumulative", 1);
+        }
+      }
+      if (processingTime > 1000) {
+        // we use a hard-coded one second period so that we can clearly
+        // indicate the time period we're warning about in the name of the
+        // metric itself
+        rpcMetrics.inc(ctx.getMethodName() + ".aboveOneSec.",
+          processingTime);
+        HRegion.incrNumericPersistentMetric(ctx.getMethodName() +
+          ".aboveOneSec.cumulative", 1);
+      }
+    }
+  }
+  /**
+   * Write to the log if an operation is too slow or too large.
+   *
+   * @param ctx Context for the thrift call
+   * @param tag "tooSlow", or "tooLarge"?
+   * @throws IOException
+   */
+  private void logResponse(ThriftCallStats ctx, String tag)
+    throws IOException {
+    Object params[] = ctx.getCallArgs();
+    // for JSON encoding
+    ObjectMapper mapper = new ObjectMapper();
+    // base information that is reported regardless of type of call
+    Map<String, Object> responseInfo = new HashMap<String, Object>();
+    responseInfo.put("starttimems", ctx.getStartTimeMs());
+    responseInfo.put("processingtimems", ctx.getProcessingTimeMs());
+    responseInfo.put("queuetimems", ctx.getQueueTimeMs());
+    responseInfo.put("responsesize", ctx.getResponseSize());
+    responseInfo.put("client", ctx.getClientAddress());
+    responseInfo.put("class", serverInstance.getClass().getSimpleName());
+    responseInfo.put("method", ctx.getMethodName());
+
+    if (params.length == 2 && serverInstance instanceof ThriftHRegionServer &&
+      params[0] instanceof byte[] &&
+      params[1] instanceof Operation) {
+      // if the slow process is a query, we want to log its table as well
+      // as its own fingerprint
+      byte [] tableName =
+        HRegionInfo.parseRegionName((byte[]) params[0])[0];
+      responseInfo.put("table", Bytes.toStringBinary(tableName));
+      // annotate the response map with operation details
+      responseInfo.putAll(((Operation) params[1]).toMap());
+      // report to the log file
+      LOG.warn("(operation" + tag + "): " +
+        mapper.writeValueAsString(responseInfo));
+    } else if (params.length == 1 && serverInstance instanceof ThriftHRegionServer &&
+      params[0] instanceof Operation) {
+      // annotate the response map with operation details
+      responseInfo.putAll(((Operation) params[0]).toMap());
+      // report to the log file
+      LOG.warn("(operation" + tag + "): " +
+        mapper.writeValueAsString(responseInfo));
+    } else {
+      // can't get JSON details, so just report call.toString() along with
+      // a more generic tag.
+      // responseInfo.put("call", call.toString());
+      LOG.warn("(response" + tag + "): " +
+        mapper.writeValueAsString(responseInfo));
+    }
+  }
+
+  private int getBytesWritten(ThriftCallStats ctx) {
+    if (!(ctx.requestContext instanceof NiftyRequestContext)) {
+      // Standard TTransport interface doesn't give us a way to determine how many bytes
+      // were read
+      return 0;
+    }
+
+    NiftyRequestContext requestContext = (NiftyRequestContext) ctx.requestContext;
+    return requestContext.getNiftyTransport().getWrittenByteCount();
+  }
+
+  /**
+   * Send information about the call to the client
+   *
+   * @param call
+   * @param requestContext
+   * @throws Exception
+   */
+  private void sendCallInfoToClient(Call call, RequestContext requestContext)
+      throws Exception {
+    if (call != null) {
+      TTransport inputTransport = requestContext.getInputProtocol()
+          .getTransport();
+      TTransport outputTransport = requestContext.getOutputProtocol()
+          .getTransport();
+      if (outputTransport instanceof THeaderTransport) {
+        THeaderTransport headerTransport = (THeaderTransport) inputTransport;
+        String dataString = Bytes
+            .writeThriftBytesAndGetString(call, Call.class);
+        headerTransport.setHeader(HConstants.THRIFT_HEADER_FROM_SERVER, dataString);
+      } else {
+        LOG.error("input transport was not THeaderTransport, client cannot read headers");
+      }
+    }
+  }
+
+  /**
+   * Construct the Call object from the information that client gave us
+   *
+   * @param requestContext
+   * @return
+   */
+  private Call getCallInfoFromClient(RequestContext requestContext) {
+    TTransport inputTransport = requestContext.getInputProtocol()
+        .getTransport();
+    if (inputTransport instanceof THeaderTransport) {
+      THeaderTransport headerTransport = (THeaderTransport) inputTransport;
+      String dataString = headerTransport.getReadHeaders().get(
+          HConstants.THRIFT_HEADER_FROM_CLIENT);
+      if (dataString != null) {
+        byte[] dataBytes = Bytes.hexToBytes(dataString);
+        try {
+          Call call = Bytes.readThriftBytes(dataBytes, Call.class);
+          return call;
+        } catch (Exception e) {
+          e.printStackTrace();
+          LOG.error("Deserialization of the call header didn't succeed.");
+        }
+      }
+    } else {
+      LOG.error("Input transport is not instance of THeaderTransport, but "
+          + inputTransport.getClass().getName());
+    }
+    return null;
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCache.java?rev=1576884&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCache.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCache.java Wed Mar 12 20:30:00 2014
@@ -0,0 +1,80 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc.thrift;
+
+import com.facebook.swift.service.ThriftClientManager;
+import org.apache.hadoop.hbase.ipc.ThriftClientInterface;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+
+public interface ThriftClientCache {
+
+  /**
+   * Gives a Thrift client given the InetSocketAddress and class name for
+   * the ThriftClientInterface
+   *
+   * @param address
+   * @param clazz
+   * @return
+   * @throws InterruptedException
+   * @throws ExecutionException
+   * @throws Exception
+   */
+  public ThriftClientInterface getClient(
+      InetSocketAddress address, Class<? extends ThriftClientInterface> clazz)
+      throws InterruptedException, ExecutionException, Exception;
+
+  /**
+   * Put back the ThriftClientInterface that was taken using the
+   * {@link ThriftClientCache#getClient(InetSocketAddress, Class)}
+   * @param server
+   * @param addr
+   * @param clazz
+   * @throws Exception
+   */
+  public void putBackClient(ThriftClientInterface server, InetSocketAddress addr,
+      Class<? extends ThriftClientInterface> clazz) throws Exception;
+
+  /**
+   * Getter for the ThriftClientManager
+   * @return
+   */
+  public ThriftClientManager getThriftClientManager();
+
+  /**
+   * Closes the connection
+   * @param address
+   * @param clazz
+   * @param client
+   * @throws IOException
+   */
+  void close(InetSocketAddress address,
+      Class<? extends ThriftClientInterface> clazz, ThriftClientInterface client)
+      throws IOException;
+
+  /**
+   * Shuts down the client manager and also the connections present in the pools
+   * @throws Exception
+   */
+  public void shutDownClientManager() throws Exception;
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheFactory.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheFactory.java?rev=1576884&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheFactory.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheFactory.java Wed Mar 12 20:30:00 2014
@@ -0,0 +1,49 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc.thrift;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Client Cache factory which creates the respective instances of
+ * ThriftClientCache.
+ */
+public class ThriftClientCacheFactory {
+  private Configuration conf;
+  public ThriftClientCacheFactory(Configuration conf) {
+    this.conf = conf;
+  }
+  /**
+   * Returns a ThriftClientCacheWithConnectionPooling
+   * @return
+   */
+  public ThriftClientCache createClientCacheWithPooling() {
+    return new ThriftClientCacheWithConnectionPooling(conf);
+  }
+
+  /**
+   * Returns a ThreadBasedThriftClientCache
+   * @return
+   */
+  public ThriftClientCache createThreadBasedClientCache() {
+    return new ThreadBasedThriftClientCache(conf);
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java?rev=1576884&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java Wed Mar 12 20:30:00 2014
@@ -0,0 +1,199 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc.thrift;
+
+import com.facebook.swift.service.ThriftClientManager;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ipc.ThriftClientInterface;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * ThriftClientCache with a backing connection pool for each server. This also
+ * provides knobs to control the rate at which eviction of connections in the
+ * pool happen.
+ */
+public class ThriftClientCacheWithConnectionPooling implements
+    ThriftClientCache {
+
+  private final Log LOG = LogFactory
+      .getLog(ThriftClientCacheWithConnectionPooling.class);
+  public static final String TIME_BETWEEN_EVICTION_RUNS_MILLIS =
+      "hbase.client.cachepool.timeBetweenEvictionRunsMillis";
+  public static final String MIN_EVICTABLE_IDLE_TIME_MILLIS =
+      "hbase.client.cachepool.minEvictableIdleTimeMillis";
+  public static final String WHEN_EXHAUSTED_MAX_WAITTIME =
+      "hbase.client.cachepool.whenExhaustedMaxWaitTime";
+  public static final String WHEN_EXHAUSTED =
+      "hbase.client.cachepool.whenExhausted";
+  public static final String MAX_ACTIVE = "hbase.client.cachepool.maxActive";
+  public static final String MIN_IDLE = "hbase.client.cachepool.minIdle";
+
+  private final ThriftClientManager clientManager;
+  private final Map<Pair<InetSocketAddress,
+    Class<? extends ThriftClientInterface>>,
+      GenericObjectPool<ThriftClientInterface>>
+    clientPools;
+  private Configuration conf;
+
+  public ThriftClientCacheWithConnectionPooling(Configuration conf) {
+    this.conf = conf;
+    clientManager = new ThriftClientManager();
+    clientPools = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public ThriftClientInterface getClient(InetSocketAddress address,
+      Class<? extends ThriftClientInterface> clazz) throws Exception {
+    Pair<InetSocketAddress, Class<? extends ThriftClientInterface>> key =
+        new Pair<InetSocketAddress, Class<? extends ThriftClientInterface>>(
+        address, clazz);
+    GenericObjectPool<ThriftClientInterface> clientPool = clientPools.get(key);
+    if (clientPool == null) {
+      synchronized (clientPools) {
+        clientPool = clientPools.get(key);
+        if (clientPool == null) {
+          clientPool = createGenericObjectPool(address, clazz);
+          clientPools.put(key, clientPool);
+        }
+      }
+    }
+    return clientPool.borrowObject();
+  }
+
+  /**
+   * The GenericObjectPool can be re-configured as per the documentation here:
+   * http://commons.apache.org/proper/commons-pool/api-1.6/org/
+   * apache/commons/pool/impl/GenericObjectPool.html
+   *
+   * @param address
+   * @param clazz
+   * @return
+   */
+  public GenericObjectPool<ThriftClientInterface> createGenericObjectPool(
+      InetSocketAddress address, Class<? extends ThriftClientInterface> clazz) {
+    ThriftClientObjectFactory factory = new ThriftClientObjectFactory(address,
+        clazz, this.clientManager, this.conf);
+    GenericObjectPool.Config config = new GenericObjectPool.Config();
+    long DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS = 1000;
+    long DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS = 30000;
+    long DEFAULT_WHEN_EXHAUSTED_MAX_WAITTIME = 1000;
+    int DEFAULT_MAX_ACTIVE = 2000;
+
+    // Keep some idle connections to prevent asynchronous client from contention on
+    //   a single connection to each region server.
+    // WARNING: Default maxIdle value is 8. If you want minIdle to be large than 8,
+    //   you have to also modify config.maxIdle.
+    int DEFAULT_MIN_IDLE = 5;
+
+    config.maxActive = conf.getInt(MAX_ACTIVE, DEFAULT_MAX_ACTIVE);
+    config.timeBetweenEvictionRunsMillis =
+        conf.getLong(TIME_BETWEEN_EVICTION_RUNS_MILLIS,
+            DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS);
+    config.minEvictableIdleTimeMillis =
+        conf.getLong(MIN_EVICTABLE_IDLE_TIME_MILLIS,
+            DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS);
+    config.minIdle =
+        conf.getInt(MIN_IDLE, DEFAULT_MIN_IDLE);
+
+    // The idea is to fail fast if we run out of the maximum number of
+    // connections for one region server, which by default is pretty large (500).
+    // If we exhaust that many connections, we have bigger problems to fix.
+    config.whenExhaustedAction = (byte)conf.getInt(WHEN_EXHAUSTED,
+        GenericObjectPool.WHEN_EXHAUSTED_FAIL);
+    config.maxWait = conf.getLong(WHEN_EXHAUSTED_MAX_WAITTIME,
+        DEFAULT_WHEN_EXHAUSTED_MAX_WAITTIME);
+    config.lifo = false;
+    GenericObjectPool<ThriftClientInterface> pool = new GenericObjectPool<ThriftClientInterface>(
+        factory, config);
+    return pool;
+  }
+
+  /**
+   * Puts back a ThriftClientInterface back to the client pool. Reuse of the
+   * ThriftClientInterface after placing it back into the cache might result in
+   * unpredicted behavior.
+   *
+   * @throws Exception
+   */
+  @Override
+  public void putBackClient(ThriftClientInterface client,
+      InetSocketAddress address, Class<? extends ThriftClientInterface> clazz)
+      throws Exception {
+    Pair<InetSocketAddress, Class<? extends ThriftClientInterface>> key = new Pair<InetSocketAddress, Class<? extends ThriftClientInterface>>(
+        address, clazz);
+    GenericObjectPool<ThriftClientInterface> clientPool = clientPools.get(key);
+    if (clientPool == null) {
+      client.close();
+      return;
+    }
+    clientPool.returnObject(client);
+  }
+
+  @Override
+  public ThriftClientManager getThriftClientManager() {
+    return this.clientManager;
+  }
+
+  @Override
+  public void close(InetSocketAddress address,
+      Class<? extends ThriftClientInterface> clazz, ThriftClientInterface client)
+      throws IOException {
+    Pair<InetSocketAddress, Class<? extends ThriftClientInterface>> key = new Pair<InetSocketAddress, Class<? extends ThriftClientInterface>>(
+        address, clazz);
+    GenericObjectPool<ThriftClientInterface> clientPool = clientPools.get(key);
+    try {
+      if (clientPool == null) {
+        client.close();
+        return;
+      }
+
+      clientPool.invalidateObject(client);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public synchronized void shutDownClientManager() throws Exception {
+    for (Pair<InetSocketAddress, Class<? extends ThriftClientInterface>> pair : this.clientPools
+        .keySet()) {
+      closeAll(pair.getFirst(), pair.getSecond());
+    }
+    clientPools.clear();
+  }
+
+  private void closeAll(InetSocketAddress server,
+      Class<? extends ThriftClientInterface> clientInterface)
+      throws IOException {
+    Pair<InetSocketAddress, Class<? extends ThriftClientInterface>> key = new Pair<InetSocketAddress, Class<? extends ThriftClientInterface>>(
+        server, clientInterface);
+    GenericObjectPool<ThriftClientInterface> clientPool = clientPools.get(key);
+    clientPool.clear();
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientObjectFactory.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientObjectFactory.java?rev=1576884&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientObjectFactory.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientObjectFactory.java Wed Mar 12 20:30:00 2014
@@ -0,0 +1,95 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc.thrift;
+
+import com.facebook.nifty.client.FramedClientConnector;
+import com.facebook.nifty.duplex.TDuplexProtocolFactory;
+import com.facebook.nifty.header.client.HeaderClientConnector;
+import com.facebook.nifty.header.protocol.TFacebookCompactProtocol;
+import com.facebook.swift.service.ThriftClient;
+import com.facebook.swift.service.ThriftClientConfig;
+import com.facebook.swift.service.ThriftClientManager;
+import io.airlift.units.Duration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.pool.BasePoolableObjectFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ipc.ThriftClientInterface;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+public class ThriftClientObjectFactory extends
+    BasePoolableObjectFactory<ThriftClientInterface> {
+  private final Log LOG = LogFactory.getLog(ThriftClientObjectFactory.class);
+  private final InetSocketAddress address;
+  private final Class<? extends ThriftClientInterface> clazz;
+  private final ThriftClientManager clientManager;
+  private final Configuration conf;
+  private final boolean useHeaderProtocol;
+  private final ThriftClientConfig thriftClientConfig;
+
+  public ThriftClientObjectFactory(final InetSocketAddress address,
+      final Class<? extends ThriftClientInterface> clazz,
+      ThriftClientManager clientManager,
+      Configuration conf) {
+    this.address = address;
+    this.clazz = clazz;
+    this.clientManager = clientManager;
+    this.conf = conf;
+    this.useHeaderProtocol = this.conf.getBoolean(HConstants.USE_HEADER_PROTOCOL,
+      HConstants.DEFAULT_USE_HEADER_PROTOCOL);
+    this.thriftClientConfig = new ThriftClientConfig()
+      .setMaxFrameSize(HConstants.SWIFT_MAX_FRAME_SIZE_BYTES_DEFAULT)
+      .setConnectTimeout(
+        new Duration(conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+          HConstants.DEFAULT_HBASE_RPC_TIMEOUT), TimeUnit.MILLISECONDS));
+  }
+
+  /**
+   * TODO: create the connectors in async fashion and return ListenableFuture<ThriftClientInterface>
+   */
+  @Override
+  public ThriftClientInterface makeObject() throws Exception {
+    ThriftClient<? extends ThriftClientInterface> newClient = new ThriftClient<>(
+      clientManager, clazz, thriftClientConfig,
+        clazz.getName());
+
+    if (useHeaderProtocol) {
+      return newClient.open(new HeaderClientConnector(address)).get();
+    } else {
+      return newClient.open(
+          new FramedClientConnector(address, TDuplexProtocolFactory
+              .fromSingleFactory(new TFacebookCompactProtocol.Factory())))
+          .get();
+    }
+  }
+
+  @Override
+  public void destroyObject(ThriftClientInterface obj) {
+    try {
+      obj.close();
+    } catch (Exception e) {
+      LOG.error("Connection could not be closed properly : " + this.address, e);
+    }
+  }
+}



Mime
View raw message