accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [3/3] accumulo git commit: ACCUMULO-4584 Remove oneway method checks
Date Wed, 22 Feb 2017 20:18:50 GMT
ACCUMULO-4584 Remove oneway method checks

Remove the unnecessary oneway method checking in RpcWrapper (no longer
needed with Thrift 0.10.0) and simplify RpcWrapper.

Provide more comprehensive test of RpcWrapper behavior with in-memory
transport to test for exception handling behavior in regular and oneway
thrift service methods.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/86e6fb44
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/86e6fb44
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/86e6fb44

Branch: refs/heads/master
Commit: 86e6fb44bbf59109a61b159d2bd3bcd0fcfdfa9f
Parents: d5d18dd
Author: Christopher Tubbs <ctubbsii@apache.org>
Authored: Wed Feb 22 15:17:47 2017 -0500
Committer: Christopher Tubbs <ctubbsii@apache.org>
Committed: Wed Feb 22 15:17:47 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/accumulo/proxy/Proxy.java   |    2 +-
 .../apache/accumulo/server/rpc/RpcWrapper.java  |   70 +-
 .../accumulo/server/rpc/RpcWrapperTest.java     |  300 --
 .../accumulo/server/util/TServerUtilsTest.java  |    2 +-
 .../accumulo/gc/SimpleGarbageCollector.java     |    2 +-
 .../java/org/apache/accumulo/master/Master.java |    8 +-
 .../apache/accumulo/tserver/TabletServer.java   |   10 +-
 test/pom.xml                                    |   23 +
 test/src/main/findbugs/exclude-filter.xml       |    6 +
 .../org/apache/accumulo/test/rpc/Mocket.java    |  194 +
 .../test/rpc/SimpleThriftServiceHandler.java    |   63 +
 .../test/rpc/SimpleThriftServiceRunner.java     |   75 +
 .../accumulo/test/rpc/ThriftBehaviorIT.java     |  193 +
 .../test/rpc/thrift/SimpleThriftService.java    | 4103 ++++++++++++++++++
 test/src/main/scripts/generate-thrift.sh        |   24 +
 test/src/main/thrift/test.thrift                |   30 +
 16 files changed, 4728 insertions(+), 377 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/86e6fb44/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
----------------------------------------------------------------------
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
index 74816bb..bd6af3a 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
@@ -206,7 +206,7 @@ public class Proxy implements KeywordExecutable {
     ProxyServer impl = new ProxyServer(properties);
 
     // Wrap the implementation -- translate some exceptions
-    AccumuloProxy.Iface wrappedImpl = RpcWrapper.service(impl, new AccumuloProxy.Processor<AccumuloProxy.Iface>(impl));
+    AccumuloProxy.Iface wrappedImpl = RpcWrapper.service(impl);
 
     // Create the processor from the implementation
     TProcessor processor = new AccumuloProxy.Processor<>(wrappedImpl);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/86e6fb44/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
index b942913..6fdbb04 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
@@ -19,19 +19,11 @@ package org.apache.accumulo.server.rpc;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
 
 import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler;
 import org.apache.accumulo.core.trace.wrappers.TraceWrap;
-import org.apache.thrift.ProcessFunction;
 import org.apache.thrift.TApplicationException;
-import org.apache.thrift.TBaseProcessor;
 import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This class accommodates the changes in THRIFT-1805, which appeared in Thrift 0.9.1 and
restricts client-side notification of server-side errors to
@@ -49,23 +41,17 @@ import org.slf4j.LoggerFactory;
  * @since 1.6.1
  */
 public class RpcWrapper {
-  private static final Logger log = LoggerFactory.getLogger(RpcWrapper.class);
 
-  public static <I> I service(final I instance, final TBaseProcessor<I> processor)
{
-    final Map<String,ProcessFunction<I,?>> processorView = processor.getProcessMapView();
-    final Set<String> onewayMethods = getOnewayMethods(processorView);
-    log.debug("Found oneway Thrift methods: " + onewayMethods);
-
-    InvocationHandler handler = getInvocationHandler(instance, onewayMethods);
+  public static <I> I service(final I instance) {
+    InvocationHandler handler = getInvocationHandler(instance);
 
     @SuppressWarnings("unchecked")
     I proxiedInstance = (I) Proxy.newProxyInstance(instance.getClass().getClassLoader(),
instance.getClass().getInterfaces(), handler);
     return proxiedInstance;
   }
 
-  protected static <T> RpcServerInvocationHandler<T> getInvocationHandler(final
T instance, final Set<String> onewayMethods) {
+  protected static <T> RpcServerInvocationHandler<T> getInvocationHandler(final
T instance) {
     return new RpcServerInvocationHandler<T>(instance) {
-      private final Logger log = LoggerFactory.getLogger(instance.getClass());
 
       @Override
       public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
@@ -73,56 +59,10 @@ public class RpcWrapper {
         try {
           return super.invoke(obj, method, args);
         } catch (RuntimeException e) {
-          String msg = e.getMessage();
-          log.error(msg, e);
-          if (onewayMethods.contains(method.getName())) {
-            throw e;
-          }
-          throw new TException(msg);
-        } catch (Error e) {
-          String msg = e.getMessage();
-          log.error(msg, e);
-          if (onewayMethods.contains(method.getName())) {
-            throw e;
-          }
-          throw new TException(msg);
+          // thrift will log the exception in ProcessFunction
+          throw new TException(e);
         }
       }
     };
   }
-
-  protected static Set<String> getOnewayMethods(Map<String,?> processorView)
{
-    // Get a handle on the isOnewayMethod and make it accessible
-    final Method isOnewayMethod;
-    try {
-      isOnewayMethod = ProcessFunction.class.getDeclaredMethod("isOneway");
-    } catch (NoSuchMethodException e) {
-      throw new RuntimeException("Could not access isOneway method", e);
-    } catch (SecurityException e) {
-      throw new RuntimeException("Could not access isOneway method", e);
-    }
-    // In java7, this appears to be copying the method, but it's trivial for us to return
the object to how it was before.
-    final boolean accessible = isOnewayMethod.isAccessible();
-    isOnewayMethod.setAccessible(true);
-
-    try {
-      final Set<String> onewayMethods = new HashSet<>();
-      for (Entry<String,?> entry : processorView.entrySet()) {
-        try {
-          if ((Boolean) isOnewayMethod.invoke(entry.getValue())) {
-            onewayMethods.add(entry.getKey());
-          }
-        } catch (RuntimeException e) {
-          throw e;
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      return onewayMethods;
-    } finally {
-      // Reset it back to how it was.
-      isOnewayMethod.setAccessible(accessible);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/86e6fb44/server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java
b/server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java
deleted file mode 100644
index 894acce..0000000
--- a/server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.rpc;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler;
-import org.apache.thrift.ProcessFunction;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TProtocol;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Sets;
-
-/**
- * Verification that RpcWrapper correctly mangles Exceptions to work around Thrift.
- */
-public class RpcWrapperTest {
-
-  private static final String RTE_MESSAGE = "RpcWrapperTest's RuntimeException Message";
-
-  /**
-   * Given a method name and whether or not the method is oneway, construct a ProcessFunction.
-   *
-   * @param methodName
-   *          The service method name.
-   * @param isOneway
-   *          Is the method oneway.
-   * @return A ProcessFunction.
-   */
-  private fake_proc<FakeService> createProcessFunction(String methodName, boolean isOneway)
{
-    return new fake_proc<>(methodName, isOneway);
-  }
-
-  @Test
-  public void testSomeOnewayMethods() {
-    Map<String,ProcessFunction<FakeService,?>> procs = new HashMap<>();
-    procs.put("foo", createProcessFunction("foo", true));
-    procs.put("foobar", createProcessFunction("foobar", false));
-    procs.put("bar", createProcessFunction("bar", true));
-    procs.put("barfoo", createProcessFunction("barfoo", false));
-
-    Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs);
-    Assert.assertEquals(Sets.newHashSet("foo", "bar"), onewayMethods);
-  }
-
-  @Test
-  public void testNoOnewayMethods() {
-    Map<String,ProcessFunction<FakeService,?>> procs = new HashMap<>();
-    procs.put("foo", createProcessFunction("foo", false));
-    procs.put("foobar", createProcessFunction("foobar", false));
-    procs.put("bar", createProcessFunction("bar", false));
-    procs.put("barfoo", createProcessFunction("barfoo", false));
-
-    Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs);
-    Assert.assertEquals(Collections.<String> emptySet(), onewayMethods);
-  }
-
-  @Test
-  public void testAllOnewayMethods() {
-    Map<String,ProcessFunction<FakeService,?>> procs = new HashMap<>();
-    procs.put("foo", createProcessFunction("foo", true));
-    procs.put("foobar", createProcessFunction("foobar", true));
-    procs.put("bar", createProcessFunction("bar", true));
-    procs.put("barfoo", createProcessFunction("barfoo", true));
-
-    Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs);
-    Assert.assertEquals(Sets.newHashSet("foo", "foobar", "bar", "barfoo"), onewayMethods);
-  }
-
-  @Test
-  public void testNoExceptionWrappingForOneway() throws Throwable {
-    final Object[] args = new Object[0];
-
-    final FakeService impl = new FakeServiceImpl();
-
-    // "short" names throw RTEs and are oneway, while long names do not throw exceptions
and are not oneway.
-    RpcServerInvocationHandler<FakeService> handler = RpcWrapper.getInvocationHandler(impl,
Sets.newHashSet("foo", "bar"));
-
-    // Should throw an exception, but not be wrapped because the method is oneway
-    try {
-      handler.invoke(impl, FakeServiceImpl.class.getMethod("foo"), args);
-      Assert.fail("Expected an exception");
-    } catch (RuntimeException e) {
-      Assert.assertEquals(RTE_MESSAGE, e.getMessage());
-    }
-
-    // Should not throw an exception
-    handler.invoke(impl, FakeServiceImpl.class.getMethod("foobar"), args);
-  }
-
-  @Test
-  public void testExceptionWrappingForNonOneway() throws Throwable {
-    final Object[] args = new Object[0];
-
-    final FakeService impl = new FakeServiceImpl();
-
-    // "short" names throw RTEs and are not oneway, while long names do not throw exceptions
and are oneway.
-    RpcServerInvocationHandler<FakeService> handler = RpcWrapper.getInvocationHandler(impl,
Sets.newHashSet("foobar", "barfoo"));
-
-    // Should throw an exception, but not be wrapped because the method is oneway
-    try {
-      handler.invoke(impl, FakeServiceImpl.class.getMethod("foo"), args);
-      Assert.fail("Expected an exception");
-    } catch (TException e) {
-      // The InvocationHandler should take the exception from the RTE and make it a TException
-      Assert.assertEquals(RTE_MESSAGE, e.getMessage());
-    }
-
-    // Should not throw an exception
-    handler.invoke(impl, FakeServiceImpl.class.getMethod("foobar"), args);
-  }
-
-  //
-  // Some hacked together classes/interfaces that mimic what Thrift is doing.
-  //
-
-  /**
-   * Some fake fields for our fake arguments.
-   */
-  private static class fake_fields implements org.apache.thrift.TFieldIdEnum {
-    @Override
-    public short getThriftFieldId() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public String getFieldName() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int hashCode() {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  /**
-   * A fake thrift service
-   */
-  interface FakeService {
-    void foo();
-
-    String foobar();
-
-    int bar();
-
-    long barfoo();
-  }
-
-  /**
-   * An implementation of the fake thrift service. The "short" names throw RTEs, while long
names do not.
-   */
-  public static class FakeServiceImpl implements FakeService {
-    @Override
-    public void foo() {
-      throw new RuntimeException(RTE_MESSAGE);
-    }
-
-    @Override
-    public String foobar() {
-      return "";
-    }
-
-    @Override
-    public int bar() {
-      throw new RuntimeException(RTE_MESSAGE);
-    }
-
-    @Override
-    public long barfoo() {
-      return 0;
-    }
-  }
-
-  /**
-   * A fake ProcessFunction implementation for testing that allows injection of method name
and oneway.
-   */
-  private static class fake_proc<I extends FakeService> extends org.apache.thrift.ProcessFunction<I,foo_args>
{
-    final private boolean isOneway;
-
-    public fake_proc(String methodName, boolean isOneway) {
-      super(methodName);
-      this.isOneway = isOneway;
-    }
-
-    @Override
-    protected boolean isOneway() {
-      return isOneway;
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public TBase getResult(I iface, foo_args args) throws TException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public foo_args getEmptyArgsInstance() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int hashCode() {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  /**
-   * Fake arguments for our fake service.
-   */
-  private static class foo_args implements org.apache.thrift.TBase<foo_args,fake_fields>
{
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public boolean equals(Object o) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int hashCode() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int compareTo(foo_args o) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void read(TProtocol iprot) throws TException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void write(TProtocol oprot) throws TException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public fake_fields fieldForId(int fieldId) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean isSet(fake_fields field) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Object getFieldValue(fake_fields field) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void setFieldValue(fake_fields field, Object value) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public foo_args deepCopy() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void clear() {
-      throw new UnsupportedOperationException();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/86e6fb44/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
index bb0b177..e002c03 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
@@ -333,7 +333,7 @@ public class TServerUtilsTest {
   private ServerAddress startServer() throws Exception {
     AccumuloServerContext ctx = new AccumuloServerContext(factory);
     ClientServiceHandler clientHandler = new ClientServiceHandler(ctx, null, null);
-    Iface rpcProxy = RpcWrapper.service(clientHandler, new Processor<Iface>(clientHandler));
+    Iface rpcProxy = RpcWrapper.service(clientHandler);
     Processor<Iface> processor = new Processor<>(rpcProxy);
     // "localhost" explicitly to make sure we can always bind to that interface (avoids DNS
misconfiguration)
     String hostname = "localhost";

http://git-wip-us.apache.org/repos/asf/accumulo/blob/86e6fb44/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 5805272..1e8553b 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -701,7 +701,7 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements
Ifa
   }
 
   private HostAndPort startStatsService() throws UnknownHostException {
-    Iface rpcProxy = RpcWrapper.service(this, new Processor<Iface>(this));
+    Iface rpcProxy = RpcWrapper.service(this);
     final Processor<Iface> processor;
     if (ThriftServerType.SASL == getThriftServerType()) {
       Iface tcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass(), getConfiguration());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/86e6fb44/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index b926b52..c3443e7 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -125,9 +125,9 @@ import org.apache.accumulo.server.master.state.ZooStore;
 import org.apache.accumulo.server.master.state.ZooTabletStateStore;
 import org.apache.accumulo.server.metrics.Metrics;
 import org.apache.accumulo.server.replication.ZooKeeperInitialization;
+import org.apache.accumulo.server.rpc.HighlyAvailableServiceWrapper;
 import org.apache.accumulo.server.rpc.RpcWrapper;
 import org.apache.accumulo.server.rpc.ServerAddress;
-import org.apache.accumulo.server.rpc.HighlyAvailableServiceWrapper;
 import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
 import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.rpc.ThriftServerType;
@@ -1142,7 +1142,7 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
     clientHandler = new MasterClientServiceHandler(this);
     // Ensure that calls before the master gets the lock fail
     Iface haProxy = HighlyAvailableServiceWrapper.service(clientHandler, this);
-    Iface rpcProxy = RpcWrapper.service(haProxy, new Processor<Iface>(clientHandler));
+    Iface rpcProxy = RpcWrapper.service(haProxy);
     final Processor<Iface> processor;
     if (ThriftServerType.SASL == getThriftServerType()) {
       Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clientHandler.getClass(),
getConfiguration());
@@ -1158,8 +1158,8 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
     // Start the replication coordinator which assigns tservers to service replication requests
     MasterReplicationCoordinator impl = new MasterReplicationCoordinator(this);
     ReplicationCoordinator.Iface haReplicationProxy = HighlyAvailableServiceWrapper.service(impl,
this);
-    ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> replicationCoordinatorProcessor
= new ReplicationCoordinator.Processor<>(RpcWrapper.service(
-        impl, new ReplicationCoordinator.Processor<>(haReplicationProxy)));
+    ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> replicationCoordinatorProcessor
= new ReplicationCoordinator.Processor<>(
+        RpcWrapper.service(haReplicationProxy));
     ServerAddress replAddress = TServerUtils.startServer(this, hostname, Property.MASTER_REPLICATION_COORDINATOR_PORT,
replicationCoordinatorProcessor,
         "Master Replication Coordinator", "Replication Coordinator", null, Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS,
         Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/86e6fb44/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 15a9106..b1b2d72 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -18,6 +18,8 @@ package org.apache.accumulo.tserver;
 
 import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
 
 import java.io.IOException;
@@ -126,6 +128,7 @@ import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 import org.apache.accumulo.core.tabletserver.thrift.TDurability;
 import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
 import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
+import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
@@ -259,9 +262,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.net.HostAndPort;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
 
 public class TabletServer extends AccumuloServerContext implements Runnable {
 
@@ -2306,7 +2306,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
   private HostAndPort startTabletClientService() throws UnknownHostException {
     // start listening for client connection last
     clientHandler = new ThriftClientHandler();
-    Iface rpcProxy = RpcWrapper.service(clientHandler, new Processor<Iface>(clientHandler));
+    Iface rpcProxy = RpcWrapper.service(clientHandler);
     final Processor<Iface> processor;
     if (ThriftServerType.SASL == getThriftServerType()) {
       Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, ThriftClientHandler.class,
getConfiguration());
@@ -2322,7 +2322,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
 
   private HostAndPort startReplicationService() throws UnknownHostException {
     final ReplicationServicerHandler handler = new ReplicationServicerHandler(this);
-    ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler, new ReplicationServicer.Processor<ReplicationServicer.Iface>(handler));
+    ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler);
     ReplicationServicer.Iface repl = TCredentialsUpdatingWrapper.service(rpcProxy, handler.getClass(),
getConfiguration());
     ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<>(repl);
     AccumuloConfiguration conf = getServerConfigurationFactory().getConfiguration();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/86e6fb44/test/pom.xml
----------------------------------------------------------------------
diff --git a/test/pom.xml b/test/pom.xml
index 500d088..6e768a9 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -269,6 +269,29 @@
   </build>
   <profiles>
     <profile>
+      <id>thrift</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>exec-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>generate-thrift</id>
+                <goals>
+                  <goal>exec</goal>
+                </goals>
+                <phase>generate-sources</phase>
+                <configuration>
+                  <executable>${basedir}/src/main/scripts/generate-thrift.sh</executable>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
       <!-- create shaded test jar appropriate for running ITs on MapReduce -->
       <id>mrit</id>
       <activation>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/86e6fb44/test/src/main/findbugs/exclude-filter.xml
----------------------------------------------------------------------
diff --git a/test/src/main/findbugs/exclude-filter.xml b/test/src/main/findbugs/exclude-filter.xml
index e9acd09..464df82 100644
--- a/test/src/main/findbugs/exclude-filter.xml
+++ b/test/src/main/findbugs/exclude-filter.xml
@@ -16,6 +16,12 @@
 -->
 <FindBugsFilter>
   <Match>
+    <!-- ignore thrift-generated files -->
+    <Or>
+      <Package name="org.apache.accumulo.test.rpc.thrift" />
+    </Or>
+  </Match>
+  <Match>
     <!-- ignore intentional infinite loop in test main methods -->
     <Or>
       <Class name="org.apache.accumulo.test.stress.random.Write" />

http://git-wip-us.apache.org/repos/asf/accumulo/blob/86e6fb44/test/src/main/java/org/apache/accumulo/test/rpc/Mocket.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/rpc/Mocket.java b/test/src/main/java/org/apache/accumulo/test/rpc/Mocket.java
new file mode 100644
index 0000000..81c774d
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/rpc/Mocket.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.rpc;
+
+import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Mocket - a Mock Socket
+ * <p>
+ * Implements a bi-directional client-server transport in memory, using two FIFO queues.
The output stream of the client is wired to the input stream of the
+ * server, and the output stream of the server is wired to the input stream of the client.
+ */
+public class Mocket {
+
+  private final TTransport clientTransport;
+  private final TServerTransport serverTransport;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  public Mocket() {
+    Buffer serverQueue = new Buffer();
+    Buffer clientQueue = new Buffer();
+    // wire up the two queues to each other
+    clientTransport = new MocketTransport(clientQueue, serverQueue);
+    serverTransport = new MocketServerTransport(new MocketTransport(serverQueue, clientQueue));
+
+  }
+
+  public TServerTransport getServerTransport() {
+    return serverTransport;
+  }
+
+  public TTransport getClientTransport() {
+    return clientTransport;
+  }
+
+  private boolean isMocketClosed() {
+    return closed.get();
+  }
+
+  private void closeMocket() {
+    closed.set(true);
+  }
+
+  private class Buffer {
+
+    private final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
+
+    public void write(int b) {
+      queue.add(b);
+    }
+
+    public void write(byte[] buf, int off, int len) {
+      Objects.requireNonNull(buf);
+      Preconditions.checkPositionIndexes(off, off + len, buf.length);
+      if (len == 0) {
+        return;
+      }
+      for (int i = 0; i < len; i++) {
+        write(buf[off + i]);
+      }
+    }
+
+    public int read() {
+      Integer item;
+      // item = queue.take();
+      // loop below makes sure we don't block indefinitely
+      while (!isMocketClosed()) {
+        try {
+          item = queue.poll(10, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+          // reset interrupt flag before returning
+          Thread.currentThread().interrupt();
+          closeMocket();
+          return -1;
+        }
+        // null means the timeout was reached
+        if (item != null) {
+          return item;
+        }
+      }
+      return -1;
+    }
+
+    public int read(byte[] buf, int off, int len) {
+      Objects.requireNonNull(buf);
+      Preconditions.checkPositionIndexes(off, off + len, buf.length);
+      if (len == 0) {
+        return 0;
+      }
+      int c = read();
+      if (c == -1) {
+        return -1;
+      }
+      buf[off] = (byte) c;
+
+      int i;
+      for (i = 1; i < len; i++) {
+        c = read();
+        if (c == -1) {
+          break;
+        }
+        buf[off + i] = (byte) c;
+      }
+      return i;
+    }
+
+  }
+
+  private class MocketServerTransport extends TServerTransport {
+
+    private final MocketTransport servTrans;
+
+    public MocketServerTransport(MocketTransport mocketTransport) {
+      servTrans = mocketTransport;
+    }
+
+    @Override
+    public void listen() throws TTransportException {}
+
+    @Override
+    public void close() {
+      acceptImpl().close();
+    }
+
+    @Override
+    protected TTransport acceptImpl() {
+      return servTrans;
+    }
+
+    @Override
+    public void interrupt() {
+      close();
+    }
+
+  }
+
+  private class MocketTransport extends TTransport {
+
+    private final Buffer input;
+    private final Buffer output;
+
+    private MocketTransport(Buffer input, Buffer output) {
+      this.input = input;
+      this.output = output;
+    }
+
+    @Override
+    public void write(byte[] buf, int off, int len) throws TTransportException {
+      output.write(buf, off, len);
+    }
+
+    @Override
+    public int read(byte[] buf, int off, int len) throws TTransportException {
+      return input.read(buf, off, len);
+    }
+
+    @Override
+    public void open() throws TTransportException {}
+
+    @Override
+    public boolean isOpen() {
+      return !isMocketClosed();
+    }
+
+    @Override
+    public void close() {
+      closeMocket();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/86e6fb44/test/src/main/java/org/apache/accumulo/test/rpc/SimpleThriftServiceHandler.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/rpc/SimpleThriftServiceHandler.java
b/test/src/main/java/org/apache/accumulo/test/rpc/SimpleThriftServiceHandler.java
new file mode 100644
index 0000000..02db55c
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/rpc/SimpleThriftServiceHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.rpc;
+
+import org.apache.accumulo.test.rpc.thrift.SimpleThriftService;
+import org.apache.thrift.TException;
+
+public class SimpleThriftServiceHandler implements SimpleThriftService.Iface {
+
+  private void setProp(String method, String value) {
+    System.setProperty(this.getClass().getSimpleName() + "." + method, value);
+  }
+
+  @Override
+  public String echoFail(String value) throws TException {
+    setProp("echoFail", value);
+    throw new TException(new UnsupportedOperationException(value));
+  }
+
+  @Override
+  public String echoRuntimeFail(String value) {
+    setProp("echoRuntimeFail", value);
+    throw new UnsupportedOperationException(value);
+  }
+
+  @Override
+  public String echoPass(String value) {
+    setProp("echoPass", value);
+    return value;
+  }
+
+  @Override
+  public void onewayFail(String value) throws TException {
+    setProp("onewayFail", value);
+    throw new TException(new UnsupportedOperationException(value));
+  }
+
+  @Override
+  public void onewayRuntimeFail(String value) {
+    setProp("onewayRuntimeFail", value);
+    throw new UnsupportedOperationException(value);
+  }
+
+  @Override
+  public void onewayPass(String value) {
+    setProp("onewayPass", value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/86e6fb44/test/src/main/java/org/apache/accumulo/test/rpc/SimpleThriftServiceRunner.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/rpc/SimpleThriftServiceRunner.java
b/test/src/main/java/org/apache/accumulo/test/rpc/SimpleThriftServiceRunner.java
new file mode 100644
index 0000000..f2865c2
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/rpc/SimpleThriftServiceRunner.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.rpc;
+
+import org.apache.accumulo.server.rpc.RpcWrapper;
+import org.apache.accumulo.test.rpc.thrift.SimpleThriftService;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TSimpleServer;
+
+/**
+ * A utility for starting a simple thrift server, and providing a corresponding client for
in-memory testing of thrift behavior
+ */
+public class SimpleThriftServiceRunner {
+
+  private SimpleThriftServiceHandler handler = new SimpleThriftServiceHandler();
+
+  private final Mocket mocket;
+  private final Thread serviceThread;
+  private final TServer server;
+
+  public SimpleThriftServiceRunner(String threadName, boolean useWrapper) {
+    this.mocket = new Mocket();
+    this.server = createServer(useWrapper);
+    this.serviceThread = new Thread(() -> server.serve(), threadName);
+  }
+
+  public void startService() {
+    serviceThread.start();
+  }
+
+  public SimpleThriftServiceHandler handler() {
+    return handler;
+  }
+
+  public SimpleThriftService.Client client() {
+    return new SimpleThriftService.Client(new TBinaryProtocol(mocket.getClientTransport()));
+  }
+
+  private TServer createServer(boolean useWrapper) {
+    TServer.Args args = new TServer.Args(mocket.getServerTransport());
+    SimpleThriftService.Iface actualHandler = handler;
+    if (useWrapper) {
+      actualHandler = RpcWrapper.<SimpleThriftService.Iface> service(handler);
+    }
+    args.processor(new SimpleThriftService.Processor<>(actualHandler));
+    args.protocolFactory(new TBinaryProtocol.Factory());
+    return new TSimpleServer(args);
+  }
+
+  public void stopService() {
+    server.stop();
+    try {
+      serviceThread.join();
+    } catch (InterruptedException e) {
+      // re-set interrupt flag
+      Thread.currentThread().interrupt();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/86e6fb44/test/src/main/java/org/apache/accumulo/test/rpc/ThriftBehaviorIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/rpc/ThriftBehaviorIT.java b/test/src/main/java/org/apache/accumulo/test/rpc/ThriftBehaviorIT.java
new file mode 100644
index 0000000..ee6fa08
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/rpc/ThriftBehaviorIT.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.rpc;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.test.categories.SunnyDayTests;
+import org.apache.accumulo.test.rpc.thrift.SimpleThriftService;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.thrift.ProcessFunction;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TException;
+import org.apache.thrift.server.TSimpleServer;
+import org.hamcrest.core.IsInstanceOf;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+@Category(SunnyDayTests.class)
+public class ThriftBehaviorIT {
+
+  @Rule
+  public Timeout timeout = new Timeout(5, TimeUnit.SECONDS);
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  private SimpleThriftService.Client client;
+  private SimpleThriftServiceHandler handler;
+  private SimpleThriftServiceRunner serviceRunner;
+  private String propName;
+  private Map<Logger,Level> oldLogLevels = new HashMap<>();
+
+  private static final String KITTY_MSG = "🐈 Kitty! 🐈";
+
+  // can delete wrapper when tests pass without using it (assuming tests are good enough)
+  private static final boolean USE_RPC_WRAPPER = true;
+
+  private static final boolean SUPPRESS_SPAMMY_LOGGERS = true;
+
+  @Before
+  public void createClientAndServer() {
+    Arrays.stream(new Class<?>[] {TSimpleServer.class, ProcessFunction.class}).forEach(spammyClass
-> {
+      Logger spammyLogger = Logger.getLogger(spammyClass);
+      oldLogLevels.put(spammyLogger, spammyLogger.getLevel());
+      if (SUPPRESS_SPAMMY_LOGGERS) {
+        spammyLogger.setLevel(Level.OFF);
+      }
+    });
+
+    String threadName = ThriftBehaviorIT.class.getSimpleName() + "." + testName.getMethodName();
+    serviceRunner = new SimpleThriftServiceRunner(threadName, USE_RPC_WRAPPER);
+    serviceRunner.startService();
+    client = serviceRunner.client();
+    handler = serviceRunner.handler();
+
+    propName = testName.getMethodName();
+    if (propName.endsWith("Handler")) {
+      propName = propName.substring(0, propName.length() - 7);
+    }
+    propName = SimpleThriftServiceHandler.class.getSimpleName() + "." + propName;
+
+    // make sure the property is reset before the test runs
+    System.setProperty(propName, "-");
+    Assert.assertEquals("-", System.getProperty(propName));
+  }
+
+  @After
+  public void shutdownServer() {
+    serviceRunner.stopService();
+
+    oldLogLevels.forEach((spammyLogger, oldLevel) -> {
+      spammyLogger.setLevel(oldLevel);
+    });
+
+    // make sure the method was actually executed by the service handler
+    Assert.assertEquals(KITTY_MSG, System.getProperty(propName));
+  }
+
+  @Test
+  public void echoFailHandler() throws TException {
+    exception.expect(TException.class);
+    exception.expectCause(IsInstanceOf.instanceOf(UnsupportedOperationException.class));
+    handler.echoFail(KITTY_MSG);
+  }
+
+  @Test
+  public void echoFail() throws TException {
+    try {
+      client.echoFail(KITTY_MSG);
+      Assert.fail("Thrift client did not throw an expected exception");
+    } catch (Exception e) {
+      Assert.assertEquals(TApplicationException.class.getName(), e.getClass().getName());
+    }
+    // verify normal two-way method still passes using same client
+    echoPass();
+  }
+
+  @Test
+  public void echoRuntimeFailHandler() throws TException {
+    exception.expect(UnsupportedOperationException.class);
+    handler.echoRuntimeFail(KITTY_MSG);
+  }
+
+  @Test
+  public void echoRuntimeFail() throws TException {
+    try {
+      client.echoRuntimeFail(KITTY_MSG);
+      Assert.fail("Thrift client did not throw an expected exception");
+    } catch (Exception e) {
+      Assert.assertEquals(TApplicationException.class.getName(), e.getClass().getName());
+    }
+    // verify normal two-way method still passes using same client
+    echoPass();
+  }
+
+  @Test
+  public void echoPassHandler() {
+    Assert.assertEquals(KITTY_MSG, handler.echoPass(KITTY_MSG));
+  }
+
+  @Test
+  public void echoPass() throws TException {
+    Assert.assertEquals(KITTY_MSG, client.echoPass(KITTY_MSG));
+  }
+
+  @Test
+  public void onewayFailHandler() throws TException {
+    exception.expect(TException.class);
+    exception.expectCause(IsInstanceOf.instanceOf(UnsupportedOperationException.class));
+    handler.onewayFail(KITTY_MSG);
+  }
+
+  @Test
+  public void onewayFail() throws TException {
+    client.onewayFail(KITTY_MSG);
+    // verify normal two-way method still passes using same client
+    echoPass();
+  }
+
+  @Test
+  public void onewayRuntimeFailHandler() throws TException {
+    exception.expect(UnsupportedOperationException.class);
+    handler.onewayRuntimeFail(KITTY_MSG);
+  }
+
+  @Test
+  public void onewayRuntimeFail() throws TException {
+    client.onewayRuntimeFail(KITTY_MSG);
+    // verify normal two-way method still passes using same client
+    echoPass();
+  }
+
+  @Test
+  public void onewayPassHandler() {
+    handler.onewayPass(KITTY_MSG);
+  }
+
+  @Test
+  public void onewayPass() throws TException {
+    client.onewayPass(KITTY_MSG);
+    // verify normal two-way method still passes using same client
+    echoPass();
+  }
+
+}


Mime
View raw message