accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [3/3] accumulo git commit: ACCUMULO-3425 Add documentation, remove inner classes, reorder methods.
Date Tue, 16 Dec 2014 23:51:49 GMT
ACCUMULO-3425 Add documentation, remove inner classes, reorder methods.

Overall, clean up ThriftUtil and TServerUtils to not be such
monstrous classes full of spattered factory methods, undocumented
methods and odd interplay. Ensures that ThriftUtil only deals with
client-facing or client and server-facing code, while TServerUtils deals
with server-facing code only.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f3878f5f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f3878f5f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f3878f5f

Branch: refs/heads/master
Commit: f3878f5f677b3c80d69b7b7c8f3c351f8e5e77d1
Parents: 0433e03
Author: Josh Elser <elserj@apache.org>
Authored: Tue Dec 16 18:37:57 2014 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Tue Dec 16 18:41:34 2014 -0500

----------------------------------------------------------------------
 .../core/client/impl/ThriftTransportPool.java   |   4 +-
 .../rpc/ProtocolOverridingSSLSocketFactory.java | 103 ++++++
 .../apache/accumulo/core/rpc/ThriftUtil.java    | 322 +++++++++----------
 .../apache/accumulo/core/rpc/TraceProtocol.java |  47 +++
 .../accumulo/core/rpc/TraceProtocolFactory.java |  33 ++
 .../accumulo/server/rpc/TServerUtils.java       | 102 +++++-
 6 files changed, 432 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3878f5f/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index 7a6e6ab..1220850 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -385,8 +385,8 @@ public class ThriftTransportPool {
 
   private ThriftTransportPool() {}
 
-  public TTransport getTransportWithDefaultTimeout(HostAndPort addr, ClientContext context)
throws TTransportException {
-    return getTransport(String.format("%s:%d", addr.getHostText(), addr.getPort()), context.getClientTimeoutInMillis(),
context);
+  public TTransport getTransportWithDefaultTimeout(String addr, ClientContext context) throws
TTransportException {
+    return getTransport(addr, context.getClientTimeoutInMillis(), context);
   }
 
   public TTransport getTransport(String location, long milliseconds, ClientContext context)
throws TTransportException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3878f5f/core/src/main/java/org/apache/accumulo/core/rpc/ProtocolOverridingSSLSocketFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ProtocolOverridingSSLSocketFactory.java
b/core/src/main/java/org/apache/accumulo/core/rpc/ProtocolOverridingSSLSocketFactory.java
new file mode 100644
index 0000000..cc8ca95
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ProtocolOverridingSSLSocketFactory.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * JDK6's SSLSocketFactory doesn't seem to properly set the protocols on the Sockets that
it creates which causes an SSLv2 client hello message during
+ * handshake, even when only TLSv1 is enabled. This only appears to be an issue on the client
sockets, not the server sockets.
+ *
+ * This class wraps the SSLSocketFactory ensuring that the Socket is properly configured.
+ * http://www.coderanch.com/t/637177/Security/Disabling-handshake-message-Java
+ *
+ * This class can be removed when JDK6 support is officially unsupported by Accumulo
+ */
+class ProtocolOverridingSSLSocketFactory extends SSLSocketFactory {
+
+  private final SSLSocketFactory delegate;
+  private final String[] enabledProtocols;
+
+  public ProtocolOverridingSSLSocketFactory(final SSLSocketFactory delegate, final String[]
enabledProtocols) {
+    Preconditions.checkNotNull(enabledProtocols);
+    Preconditions.checkArgument(0 != enabledProtocols.length, "Expected at least one protocol");
+    this.delegate = delegate;
+    this.enabledProtocols = enabledProtocols;
+  }
+
+  @Override
+  public String[] getDefaultCipherSuites() {
+    return delegate.getDefaultCipherSuites();
+  }
+
+  @Override
+  public String[] getSupportedCipherSuites() {
+    return delegate.getSupportedCipherSuites();
+  }
+
+  @Override
+  public Socket createSocket(final Socket socket, final String host, final int port, final
boolean autoClose) throws IOException {
+    final Socket underlyingSocket = delegate.createSocket(socket, host, port, autoClose);
+    return overrideProtocol(underlyingSocket);
+  }
+
+  @Override
+  public Socket createSocket(final String host, final int port) throws IOException, UnknownHostException
{
+    final Socket underlyingSocket = delegate.createSocket(host, port);
+    return overrideProtocol(underlyingSocket);
+  }
+
+  @Override
+  public Socket createSocket(final String host, final int port, final InetAddress localAddress,
final int localPort) throws IOException, UnknownHostException {
+    final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort);
+    return overrideProtocol(underlyingSocket);
+  }
+
+  @Override
+  public Socket createSocket(final InetAddress host, final int port) throws IOException {
+    final Socket underlyingSocket = delegate.createSocket(host, port);
+    return overrideProtocol(underlyingSocket);
+  }
+
+  @Override
+  public Socket createSocket(final InetAddress host, final int port, final InetAddress localAddress,
final int localPort) throws IOException {
+    final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort);
+    return overrideProtocol(underlyingSocket);
+  }
+
+  /**
+   * Set the {@link javax.net.ssl.SSLSocket#getEnabledProtocols() enabled protocols} to {@link
#enabledProtocols} if the <code>socket</code> is a
+   * {@link SSLSocket}
+   *
+   * @param socket
+   *          The Socket
+   */
+  private Socket overrideProtocol(final Socket socket) {
+    if (socket instanceof SSLSocket) {
+      ((SSLSocket) socket).setEnabledProtocols(enabledProtocols);
+    }
+    return socket;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3878f5f/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
index 7f4609b..c95a62b 100644
--- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@ -18,20 +18,12 @@ package org.apache.accumulo.core.rpc;
 
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.UnknownHostException;
 import java.security.KeyStore;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
 import javax.net.ssl.SSLSocket;
 import javax.net.ssl.SSLSocketFactory;
 import javax.net.ssl.TrustManagerFactory;
@@ -44,101 +36,168 @@ import org.apache.accumulo.core.client.impl.ClientExecReturn;
 import org.apache.accumulo.core.client.impl.ThriftTransportPool;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
-import org.apache.accumulo.core.trace.Span;
-import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.thrift.TServiceClient;
 import org.apache.thrift.TServiceClientFactory;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TMessage;
-import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSSLTransportFactory;
-import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 
-import com.google.common.base.Preconditions;
 import com.google.common.net.HostAndPort;
 
+/**
+ * Factory methods for creating Thrift client objects
+ */
 public class ThriftUtil {
   private static final Logger log = Logger.getLogger(ThriftUtil.class);
 
-  public static class TraceProtocol extends TCompactProtocol {
-    private Span span = null;
-
-    @Override
-    public void writeMessageBegin(TMessage message) throws TException {
-      span = Trace.start("client:" + message.name);
-      super.writeMessageBegin(message);
-    }
+  private static final TraceProtocolFactory protocolFactory = new TraceProtocolFactory();
+  private static final TFramedTransport.Factory transportFactory = new TFramedTransport.Factory(Integer.MAX_VALUE);
+  private static final Map<Integer,TTransportFactory> factoryCache = new HashMap<Integer,TTransportFactory>();
 
-    @Override
-    public void writeMessageEnd() throws TException {
-      super.writeMessageEnd();
-      span.stop();
-    }
-
-    public TraceProtocol(TTransport transport) {
-      super(transport);
-    }
+  /**
+   * An instance of {@link TraceProtocolFactory}
+   *
+   * @return The default Thrift TProtocolFactory for RPC
+   */
+  public static TProtocolFactory protocolFactory() {
+    return protocolFactory;
   }
 
-  public static class TraceProtocolFactory extends TCompactProtocol.Factory {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public TProtocol getProtocol(TTransport trans) {
-      return new TraceProtocol(trans);
-    }
+  /**
+   * An instance of {@link TFramedTransport.Factory}
+   *
+   * @return The default Thrift TTransportFactory for RPC
+   */
+  public static TTransportFactory transportFactory() {
+    return transportFactory;
   }
 
-  static private TProtocolFactory protocolFactory = new TraceProtocolFactory();
-  static private TTransportFactory transportFactory = new TFramedTransport.Factory(Integer.MAX_VALUE);
-
-  static public <T extends TServiceClient> T createClient(TServiceClientFactory<T>
factory, TTransport transport) {
+  /**
+   * Create a Thrift client using the given factory and transport
+   */
+  public static <T extends TServiceClient> T createClient(TServiceClientFactory<T>
factory, TTransport transport) {
     return factory.getClient(protocolFactory.getProtocol(transport), protocolFactory.getProtocol(transport));
   }
 
-  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, HostAndPort address, ClientContext context) throws TTransportException {
-    return createClient(factory, ThriftTransportPool.getInstance().getTransportWithDefaultTimeout(address,
context));
+  /**
+   * Create a Thrift client using the given factory with a pooled transport (if available),
the address and client context
+   *
+   * @param factory
+   *          Thrift client factory
+   * @param address
+   *          Server address for client to connect to
+   * @param context
+   *          RPC options
+   */
+  public static <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, HostAndPort address, ClientContext context) throws TTransportException {
+    return createClient(factory, ThriftTransportPool.getInstance().getTransportWithDefaultTimeout(address.toString(),
context));
   }
 
-  static public <T extends TServiceClient> T getClientNoTimeout(TServiceClientFactory<T>
factory, String address, ClientContext context)
+  /**
+   * Create a Thrift client using the given factory with a pooled transport (if available),
the address, and client context with no timeout.
+   *
+   * @param factory
+   *          Thrift client factory
+   * @param address
+   *          Server address for client to connect to
+   * @param context
+   *          RPC options
+   */
+  public static <T extends TServiceClient> T getClientNoTimeout(TServiceClientFactory<T>
factory, String address, ClientContext context)
       throws TTransportException {
     return getClient(factory, address, context, 0);
   }
 
-  static public <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, String address, ClientContext context) throws TTransportException {
+  /**
+   * Create a Thrift client using the given factory with a pooled transport (if available),
the address and client context. Client timeout is extracted from the
+   * ClientContext
+   *
+   * @param factory
+   *          Thrift client factory
+   * @param address
+   *          Server address for client to connect to
+   * @param context
+   *          RPC options
+   */
+  public static <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, String address, ClientContext context) throws TTransportException {
     TTransport transport = ThriftTransportPool.getInstance().getTransport(address, context.getClientTimeoutInMillis(),
context);
     return createClient(factory, transport);
   }
 
-  static private <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, String address, ClientContext context, long timeout)
+  /**
+   * Create a Thrift client using the given factory with a pooled transport (if available)
using the address, client context and timeou
+   *
+   * @param factory
+   *          Thrift client factory
+   * @param address
+   *          Server address for client to connect to
+   * @param context
+   *          RPC options
+   * @param timeout
+   *          Socket timeout which overrides the ClientContext timeout
+   */
+  private static <T extends TServiceClient> T getClient(TServiceClientFactory<T>
factory, String address, ClientContext context, long timeout)
       throws TTransportException {
     TTransport transport = ThriftTransportPool.getInstance().getTransport(address, timeout,
context);
     return createClient(factory, transport);
   }
 
-  static public void returnClient(TServiceClient iface) { // Eew... the typing here is horrible
+  /**
+   * Return the transport used by the client to the shared pool.
+   *
+   * @param iface
+   *          The Client being returned or null.
+   */
+  public static void returnClient(TServiceClient iface) { // Eew... the typing here is horrible
     if (iface != null) {
       ThriftTransportPool.getInstance().returnTransport(iface.getInputProtocol().getTransport());
     }
   }
 
-  static public TabletClientService.Client getTServerClient(String address, ClientContext
context) throws TTransportException {
+  /**
+   * Create a TabletServer Thrift client
+   *
+   * @param address
+   *          Server address for client to connect to
+   * @param context
+   *          RPC options
+   */
+  public static TabletClientService.Client getTServerClient(String address, ClientContext
context) throws TTransportException {
     return getClient(new TabletClientService.Client.Factory(), address, context);
   }
 
-  static public TabletClientService.Client getTServerClient(String address, ClientContext
context, long timeout) throws TTransportException {
+  /**
+   * Create a TabletServer Thrift client
+   *
+   * @param address
+   *          Server address for client to connect to
+   * @param context
+   *          Options for connecting to the server
+   * @param timeout
+   *          Socket timeout which overrides the ClientContext timeout
+   */
+  public static TabletClientService.Client getTServerClient(String address, ClientContext
context, long timeout) throws TTransportException {
     return getClient(new TabletClientService.Client.Factory(), address, context, timeout);
   }
 
+  /**
+   * Execute the provided closure against a TabletServer at the given address. If a Thrift
transport exception occurs, the operation will be automatically
+   * retried.
+   *
+   * @param address
+   *          TabletServer address
+   * @param context
+   *          RPC options
+   * @param exec
+   *          The closure to execute
+   */
   public static void execute(String address, ClientContext context, ClientExec<TabletClientService.Client>
exec) throws AccumuloException,
       AccumuloSecurityException {
     while (true) {
@@ -160,6 +219,18 @@ public class ThriftUtil {
     }
   }
 
+  /**
+   * Execute the provided closure against the TabletServer at the given address, and return
the result of the closure to the client. If a Thrift transport
+   * exception occurs, the operation will be automatically retried.
+   *
+   * @param address
+   *          TabletServer address
+   * @param context
+   *          RPC options
+   * @param exec
+   *          Closure with a return value to execute
+   * @return The result from the closure
+   */
   public static <T> T execute(String address, ClientContext context, ClientExecReturn<T,TabletClientService.Client>
exec) throws AccumuloException,
       AccumuloSecurityException {
     while (true) {
@@ -181,19 +252,25 @@ public class ThriftUtil {
   }
 
   /**
-   * create a transport that is not pooled
+   * Create a transport that is not pooled
+   *
+   * @param address
+   *          Server address to open the transport to
+   * @param context
+   *          RPC options
    */
   public static TTransport createTransport(HostAndPort address, ClientContext context) throws
TException {
     return createClientTransport(address, (int) context.getClientTimeoutInMillis(), context.getClientSslParams());
   }
 
-  public static TTransportFactory transportFactory() {
-    return transportFactory;
-  }
-
-  private final static Map<Integer,TTransportFactory> factoryCache = new HashMap<Integer,TTransportFactory>();
-
-  synchronized public static TTransportFactory transportFactory(int maxFrameSize) {
+  /**
+   * Get an instance of the TTransportFactory with the provided maximum frame size
+   *
+   * @param maxFrameSize
+   *          Maximum Thrift message frame size
+   * @return A, possibly cached, TTransportFactory with the requested maximum frame size
+   */
+  public static synchronized TTransportFactory transportFactory(int maxFrameSize) {
     TTransportFactory factory = factoryCache.get(maxFrameSize);
     if (factory == null) {
       factory = new TFramedTransport.Factory(maxFrameSize);
@@ -202,47 +279,26 @@ public class ThriftUtil {
     return factory;
   }
 
-  synchronized public static TTransportFactory transportFactory(long maxFrameSize) {
+  /**
+   * @see #transportFactory(int)
+   */
+  public static synchronized TTransportFactory transportFactory(long maxFrameSize) {
     if (maxFrameSize > Integer.MAX_VALUE || maxFrameSize < 1)
       throw new RuntimeException("Thrift transport frames are limited to " + Integer.MAX_VALUE);
     return transportFactory((int) maxFrameSize);
   }
 
-  public static TProtocolFactory protocolFactory() {
-    return protocolFactory;
-  }
-
-  public static TServerSocket getServerSocket(int port, int timeout, InetAddress address,
SslConnectionParams params) throws TTransportException {
-    TServerSocket tServerSock;
-    if (params.useJsse()) {
-      tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(),
address);
-    } else {
-      tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address, params.getTTransportParams());
-    }
-
-    ServerSocket serverSock = tServerSock.getServerSocket();
-    if (serverSock instanceof SSLServerSocket) {
-      SSLServerSocket sslServerSock = (SSLServerSocket) serverSock;
-      String[] protocols = params.getServerProtocols();
-
-      // Be nice for the user and automatically remove protocols that might not exist in
their JVM. Keeps us from forcing config alterations too
-      // e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6
-      Set<String> socketEnabledProtocols = new HashSet<String>(Arrays.asList(sslServerSock.getEnabledProtocols()));
-      // Keep only the enabled protocols that were specified by the configuration
-      socketEnabledProtocols.retainAll(Arrays.asList(protocols));
-      if (socketEnabledProtocols.isEmpty()) {
-        // Bad configuration...
-        throw new RuntimeException("No available protocols available for secure socket. Availaable
protocols: "
-            + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", allowed protocols:
" + Arrays.toString(protocols));
-      }
-
-      // Set the protocol(s) on the server socket
-      sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new String[0]));
-    }
-
-    return tServerSock;
-  }
-
+  /**
+   * Create a TTransport for clients to the given address with the provided socket timeout
and session-layer configuration
+   *
+   * @param address
+   *          Server address to connect to
+   * @param timeout
+   *          Client socket timeout
+   * @param sslParams
+   *          RPC options for SSL servers
+   * @return An open TTransport which must be closed when finished
+   */
   public static TTransport createClientTransport(HostAndPort address, int timeout, SslConnectionParams
sslParams) throws TTransportException {
     boolean success = false;
     TTransport transport = null;
@@ -353,80 +409,4 @@ public class ThriftUtil {
       throw new TTransportException("Could not connect to " + host + " on port " + port,
e);
     }
   }
-
-  /**
-   * JDK6's SSLSocketFactory doesn't seem to properly set the protocols on the Sockets that
it creates which causes an SSLv2 client hello message during
-   * handshake, even when only TLSv1 is enabled. This only appears to be an issue on the
client sockets, not the server sockets.
-   *
-   * This class wraps the SSLSocketFactory ensuring that the Socket is properly configured.
-   * http://www.coderanch.com/t/637177/Security/Disabling-handshake-message-Java
-   *
-   * This class can be removed when JDK6 support is officially unsupported by Accumulo
-   */
-  private static class ProtocolOverridingSSLSocketFactory extends SSLSocketFactory {
-
-    private final SSLSocketFactory delegate;
-    private final String[] enabledProtocols;
-
-    public ProtocolOverridingSSLSocketFactory(final SSLSocketFactory delegate, final String[]
enabledProtocols) {
-      Preconditions.checkNotNull(enabledProtocols);
-      Preconditions.checkArgument(0 != enabledProtocols.length, "Expected at least one protocol");
-      this.delegate = delegate;
-      this.enabledProtocols = enabledProtocols;
-    }
-
-    @Override
-    public String[] getDefaultCipherSuites() {
-      return delegate.getDefaultCipherSuites();
-    }
-
-    @Override
-    public String[] getSupportedCipherSuites() {
-      return delegate.getSupportedCipherSuites();
-    }
-
-    @Override
-    public Socket createSocket(final Socket socket, final String host, final int port, final
boolean autoClose) throws IOException {
-      final Socket underlyingSocket = delegate.createSocket(socket, host, port, autoClose);
-      return overrideProtocol(underlyingSocket);
-    }
-
-    @Override
-    public Socket createSocket(final String host, final int port) throws IOException, UnknownHostException
{
-      final Socket underlyingSocket = delegate.createSocket(host, port);
-      return overrideProtocol(underlyingSocket);
-    }
-
-    @Override
-    public Socket createSocket(final String host, final int port, final InetAddress localAddress,
final int localPort) throws IOException, UnknownHostException {
-      final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort);
-      return overrideProtocol(underlyingSocket);
-    }
-
-    @Override
-    public Socket createSocket(final InetAddress host, final int port) throws IOException
{
-      final Socket underlyingSocket = delegate.createSocket(host, port);
-      return overrideProtocol(underlyingSocket);
-    }
-
-    @Override
-    public Socket createSocket(final InetAddress host, final int port, final InetAddress
localAddress, final int localPort) throws IOException {
-      final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort);
-      return overrideProtocol(underlyingSocket);
-    }
-
-    /**
-     * Set the {@link javax.net.ssl.SSLSocket#getEnabledProtocols() enabled protocols} to
{@link #enabledProtocols} if the <code>socket</code> is a
-     * {@link SSLSocket}
-     *
-     * @param socket
-     *          The Socket
-     */
-    private Socket overrideProtocol(final Socket socket) {
-      if (socket instanceof SSLSocket) {
-        ((SSLSocket) socket).setEnabledProtocols(enabledProtocols);
-      }
-      return socket;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3878f5f/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocol.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocol.java b/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocol.java
new file mode 100644
index 0000000..74aad57
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocol.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import org.apache.accumulo.core.trace.Span;
+import org.apache.accumulo.core.trace.Trace;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * TCompactProtocol implementation which automatically tracks tracing information
+ */
+public class TraceProtocol extends TCompactProtocol {
+  private Span span = null;
+
+  @Override
+  public void writeMessageBegin(TMessage message) throws TException {
+    span = Trace.start("client:" + message.name);
+    super.writeMessageBegin(message);
+  }
+
+  @Override
+  public void writeMessageEnd() throws TException {
+    super.writeMessageEnd();
+    span.stop();
+  }
+
+  public TraceProtocol(TTransport transport) {
+    super(transport);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3878f5f/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java b/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java
new file mode 100644
index 0000000..4591aa6
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * {@link TCompactProtocol.Factory} implementation which injects {@link TraceProtocol} instead
of {@link TCompactProtocol}
+ */
+public class TraceProtocolFactory extends TCompactProtocol.Factory {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public TProtocol getProtocol(TTransport trans) {
+    return new TraceProtocol(trans);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f3878f5f/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index d972b9a..210bcf5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -20,11 +20,17 @@ import java.lang.reflect.Field;
 import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 
+import javax.net.ssl.SSLServerSocket;
+
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.rpc.SslConnectionParams;
@@ -40,6 +46,8 @@ import org.apache.thrift.TProcessor;
 import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
@@ -47,9 +55,15 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.net.HostAndPort;
 
+/**
+ * Factory methods for creating Thrift server objects
+ */
 public class TServerUtils {
   private static final Logger log = LoggerFactory.getLogger(TServerUtils.class);
 
+  /**
+   * Static instance, passed to {@link ClientInfoProcessorFactory}, which will contain the
client address of any incoming RPC.
+   */
   public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
 
   /**
@@ -83,6 +97,7 @@ public class TServerUtils {
     boolean portSearch = false;
     if (portSearchProperty != null)
       portSearch = service.getConfiguration().getBoolean(portSearchProperty);
+
     // create the TimedProcessor outside the port search loop so we don't try to register
the same metrics mbean more than once
     TimedProcessor timedProcessor = new TimedProcessor(service.getConfiguration(), processor,
serverName, threadName);
     Random random = new Random();
@@ -135,9 +150,8 @@ public class TServerUtils {
     options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
     options.maxReadBufferBytes = maxMessageSize;
     options.stopTimeoutVal(5);
-    /*
-     * Create our own very special thread pool.
-     */
+
+    // Create our own very special thread pool.
     final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
     // periodically adjust the number of threads we need by checking how busy our threads
are
     SimpleTimer.getInstance(numSTThreads).schedule(new Runnable() {
@@ -171,7 +185,16 @@ public class TServerUtils {
     return new ServerAddress(new CustomNonBlockingServer(options), address);
   }
 
-  public static TServer createThreadPoolServer(TServerTransport transport, TProcessor processor)
{
+  /**
+   * Create a TThreadPoolServer with the given transport and processor
+   *
+   * @param transport
+   *          TServerTransport for the server
+   * @param processor
+   *          TProcessor for the server
+   * @return A configured TThreadPoolServer
+   */
+  public static TThreadPoolServer createThreadPoolServer(TServerTransport transport, TProcessor
processor) {
     TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
     options.protocolFactory(ThriftUtil.protocolFactory());
     options.transportFactory(ThriftUtil.transportFactory());
@@ -179,11 +202,70 @@ public class TServerUtils {
     return new TThreadPoolServer(options);
   }
 
+  /**
+   * Create the Thrift server socket for RPC running over SSL.
+   *
+   * @param port
+   *          Port of the server socket to bind to
+   * @param timeout
+   *          Socket timeout
+   * @param address
+   *          Address to bind the socket to
+   * @param params
+   *          SSL parameters
+   * @return A configured TServerSocket configured to use SSL
+   * @throws TTransportException
+   */
+  public static TServerSocket getSslServerSocket(int port, int timeout, InetAddress address,
SslConnectionParams params) throws TTransportException {
+    TServerSocket tServerSock;
+    if (params.useJsse()) {
+      tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(),
address);
+    } else {
+      tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address, params.getTTransportParams());
+    }
+
+    ServerSocket serverSock = tServerSock.getServerSocket();
+    if (serverSock instanceof SSLServerSocket) {
+      SSLServerSocket sslServerSock = (SSLServerSocket) serverSock;
+      String[] protocols = params.getServerProtocols();
+
+      // Be nice for the user and automatically remove protocols that might not exist in
their JVM. Keeps us from forcing config alterations too
+      // e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6
+      Set<String> socketEnabledProtocols = new HashSet<String>(Arrays.asList(sslServerSock.getEnabledProtocols()));
+      // Keep only the enabled protocols that were specified by the configuration
+      socketEnabledProtocols.retainAll(Arrays.asList(protocols));
+      if (socketEnabledProtocols.isEmpty()) {
+        // Bad configuration...
+        throw new RuntimeException("No available protocols available for secure socket. Availaable
protocols: "
+            + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", allowed protocols:
" + Arrays.toString(protocols));
+      }
+
+      // Set the protocol(s) on the server socket
+      sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new String[0]));
+    }
+
+    return tServerSock;
+  }
+
+  /**
+   * Create a Thrift SSL server.
+   *
+   * @param address
+   *          host and port to bind to
+   * @param processor
+   *          TProcessor for the server
+   * @param socketTimeout
+   *          Socket timeout
+   * @param sslParams
+   *          SSL parameters
+   * @return A ServerAddress with the bound-socket information and the Thrift server
+   * @throws TTransportException
+   */
   public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor,
long socketTimeout, SslConnectionParams sslParams)
       throws TTransportException {
     org.apache.thrift.transport.TServerSocket transport;
     try {
-      transport = ThriftUtil.getServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()),
sslParams);
+      transport = getSslServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()),
sslParams);
     } catch (UnknownHostException e) {
       throw new TTransportException(e);
     }
@@ -193,6 +275,9 @@ public class TServerUtils {
     return new ServerAddress(createThreadPoolServer(transport, processor), address);
   }
 
+  /**
+   * Create a Thrift server given the provided and Accumulo configuration.
+   */
   public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address,
TProcessor processor, String serverName, String threadName,
       int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize,
SslConnectionParams sslParams, long sslSocketTimeout)
       throws TTransportException {
@@ -240,7 +325,12 @@ public class TServerUtils {
     return serverAddress;
   }
 
-  // Existing connections will keep our thread running: reach in with reflection and insist
that they shutdown.
+  /**
+   * Stop a Thrift TServer. Existing connections will keep our thread running; use reflection
to forcibly shut down the threadpool.
+   *
+   * @param s
+   *          The TServer to stop
+   */
   public static void stopTServer(TServer s) {
     if (s == null)
       return;


Mime
View raw message