accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [01/23] accumulo git commit: ACCUMULO-4065 Work around TExceptions being written back to clients in oneway methods.
Date Wed, 02 Dec 2015 02:38:18 GMT
Repository: accumulo
Updated Branches:
  refs/heads/1.6 e82c3f3da -> 5724df591
  refs/heads/1.7 1853d088e -> e35e1898d
  refs/heads/master 2b6e68b6a -> 7dc373675


ACCUMULO-4065 Work around TExceptions being written back to clients in oneway methods.

Oneway methods must not throw a TException, otherwise Thrift sends a message back to
the client which causes problems for the client which next uses that connection (and
finds a message still sitting on the "wire").

Closes apache/accumulo#56


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/44b17c63
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/44b17c63
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/44b17c63

Branch: refs/heads/1.6
Commit: 44b17c639f4e5b617196f26a8f1ff56ce475221b
Parents: e82c3f3
Author: Josh Elser <elserj@apache.org>
Authored: Tue Dec 1 15:54:50 2015 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Tue Dec 1 15:54:50 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/accumulo/proxy/Proxy.java   |   4 +-
 .../apache/accumulo/server/util/RpcWrapper.java |  74 +++++-
 .../accumulo/server/util/RpcWrapperTest.java    | 261 +++++++++++++++++++
 .../accumulo/gc/SimpleGarbageCollector.java     |   2 +-
 .../java/org/apache/accumulo/master/Master.java |   3 +-
 .../apache/accumulo/tserver/TabletServer.java   |   3 +-
 6 files changed, 337 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/44b17c63/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
----------------------------------------------------------------------
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
index f3ec13b..3368d20 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.proxy.thrift.AccumuloProxy;
 import org.apache.accumulo.server.util.RpcWrapper;
 import org.apache.log4j.Logger;
+import org.apache.thrift.TBaseProcessor;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
@@ -135,7 +136,8 @@ public class Proxy {
     @SuppressWarnings("unchecked")
     Constructor<? extends TProcessor> proxyProcConstructor = (Constructor<? extends
TProcessor>) proxyProcClass.getConstructor(proxyIfaceClass);
 
-    final TProcessor processor = proxyProcConstructor.newInstance(RpcWrapper.service(impl));
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    final TProcessor processor = proxyProcConstructor.newInstance(RpcWrapper.service(impl,
((TBaseProcessor) proxyProcConstructor.newInstance(impl)).getProcessMapView()));
 
     THsHaServer.Args args = new THsHaServer.Args(socket);
     args.processor(processor);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/44b17c63/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java
index 1f30b07..aafe37c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java
@@ -19,10 +19,16 @@ package org.apache.accumulo.server.util;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.accumulo.trace.instrument.thrift.RpcServerInvocationHandler;
 import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
+import org.apache.thrift.ProcessFunction;
 import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,33 +39,89 @@ import org.slf4j.LoggerFactory;
  * a network issue, but informs the client that a {@link TApplicationException} had occurred,
as it did in Thrift 0.9.0. This performs similar functions as
  * {@link TraceWrap}, but with the additional action of translating exceptions. See also
ACCUMULO-1691 and ACCUMULO-2950.
  *
+ * ACCUMULO-4065 found that the above exception-wrapping is not appropriate for Thrift's
implementation of oneway methods. Oneway methods are defined as
+ * a method which the client does not wait for it to return. Normally, this is acceptable
as these methods are void. Therefore, if another client reuses
+ * the connection to send a new RPC, there is no "extra" data sitting on the InputStream
from the Socket (that the server sent). However, the implementation
+ * of a oneway method <em>does</em> send a response to the client when the implementation
throws a {@link TException}. This message showing up on the client's
+ * InputStream causes future use of the Thrift Connection to become unusable. As long as
the Thrift implementation sends a message back when oneway methods
+ * throw a {@link TException}, we much make sure that we don't re-wrap-and-throw any exceptions
as {@link TException}s.
+ *
  * @since 1.6.1
  */
 public class RpcWrapper {
+  private static final Logger log = LoggerFactory.getLogger(RpcWrapper.class);
+
+  public static <T> T service(final T instance, @SuppressWarnings("rawtypes") final
Map<String,ProcessFunction<T,? extends TBase>> processorView) {
+    final Set<String> onewayMethods = getOnewayMethods(processorView);
+    log.debug("Found oneway Thrift methods: " + onewayMethods);
+
+    InvocationHandler handler = getInvocationHandler(instance, onewayMethods);
+
+    @SuppressWarnings("unchecked")
+    T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(),
instance.getClass().getInterfaces(), handler);
+    return proxiedInstance;
+  }
 
-  public static <T> T service(final T instance) {
-    InvocationHandler handler = new RpcServerInvocationHandler<T>(instance) {
+  protected static <T> RpcServerInvocationHandler<T> getInvocationHandler(final
T instance, final Set<String> onewayMethods) {
+    return new RpcServerInvocationHandler<T>(instance) {
       private final Logger log = LoggerFactory.getLogger(instance.getClass());
 
       @Override
       public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
+        // e.g. ThriftClientHandler.flush(TInfo, TCredentials, ...)
         try {
           return super.invoke(obj, method, args);
         } catch (RuntimeException e) {
           String msg = e.getMessage();
           log.error(msg, e);
+          if (onewayMethods.contains(method.getName())) {
+            throw e;
+          }
           throw new TException(msg);
         } catch (Error e) {
           String msg = e.getMessage();
           log.error(msg, e);
+          if (onewayMethods.contains(method.getName())) {
+            throw e;
+          }
           throw new TException(msg);
         }
       }
     };
-
-    @SuppressWarnings("unchecked")
-    T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(),
instance.getClass().getInterfaces(), handler);
-    return proxiedInstance;
   }
 
+  protected static <T> Set<String> getOnewayMethods(@SuppressWarnings("rawtypes")
Map<String,ProcessFunction<T,? extends TBase>> processorView) {
+    // Get a handle on the isOnewayMethod and make it accessible
+    final Method isOnewayMethod;
+    try {
+      isOnewayMethod = ProcessFunction.class.getDeclaredMethod("isOneway");
+    } catch (NoSuchMethodException e) {
+      throw new RuntimeException("Could not access isOneway method", e);
+    } catch (SecurityException e) {
+      throw new RuntimeException("Could not access isOneway method", e);
+    }
+    // In java7, this appears to be copying the method, but it's trivial for us to return
the object to how it was before.
+    final boolean accessible = isOnewayMethod.isAccessible();
+    isOnewayMethod.setAccessible(true);
+
+    try {
+      final Set<String> onewayMethods = new HashSet<String>();
+      for (@SuppressWarnings("rawtypes") Entry<String,ProcessFunction<T,? extends TBase>>
entry : processorView.entrySet()) {
+        try {
+          if ((Boolean) isOnewayMethod.invoke(entry.getValue())) {
+            onewayMethods.add(entry.getKey());
+          }
+        } catch (RuntimeException e) {
+          throw e;
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      return onewayMethods;
+    } finally {
+      // Reset it back to how it was.
+      isOnewayMethod.setAccessible(accessible);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/44b17c63/server/base/src/test/java/org/apache/accumulo/server/util/RpcWrapperTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/RpcWrapperTest.java
b/server/base/src/test/java/org/apache/accumulo/server/util/RpcWrapperTest.java
new file mode 100644
index 0000000..63a836f
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/RpcWrapperTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.trace.instrument.thrift.RpcServerInvocationHandler;
+import org.apache.thrift.ProcessFunction;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocol;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Verification that RpcWrapper correctly mangles Exceptions to work around Thrift.
+ */
+public class RpcWrapperTest {
+
+  private static final String RTE_MESSAGE = "RpcWrapperTest's RuntimeException Message";
+  /**
+   * Given a method name and whether or not the method is oneway, construct a ProcessFunction.
+   *
+   * @param methodName The service method name.
+   * @param isOneway Is the method oneway.
+   * @return A ProcessFunction.
+   */
+  private fake_proc<FakeService> createProcessFunction(String methodName, boolean isOneway)
{
+    return new fake_proc<FakeService>(methodName, isOneway);
+  }
+
+  @Test
+  public void testSomeOnewayMethods() {
+    @SuppressWarnings("rawtypes")
+    Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new HashMap<String,ProcessFunction<FakeService,?
extends TBase>>();
+    procs.put("foo", createProcessFunction("foo", true));
+    procs.put("foobar", createProcessFunction("foobar", false));
+    procs.put("bar", createProcessFunction("bar", true));
+    procs.put("barfoo", createProcessFunction("barfoo", false));
+
+    Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs);
+    Assert.assertEquals(Sets.newHashSet("foo", "bar"), onewayMethods);
+  }
+
+  @Test
+  public void testNoOnewayMethods() {
+    @SuppressWarnings("rawtypes")
+    Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new HashMap<String,ProcessFunction<FakeService,?
extends TBase>>();
+    procs.put("foo", createProcessFunction("foo", false));
+    procs.put("foobar", createProcessFunction("foobar", false));
+    procs.put("bar", createProcessFunction("bar", false));
+    procs.put("barfoo", createProcessFunction("barfoo", false));
+
+    Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs);
+    Assert.assertEquals(Collections.<String> emptySet(), onewayMethods);
+  }
+
+  @Test
+  public void testAllOnewayMethods() {
+    @SuppressWarnings("rawtypes")
+    Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new HashMap<String,ProcessFunction<FakeService,?
extends TBase>>();
+    procs.put("foo", createProcessFunction("foo", true));
+    procs.put("foobar", createProcessFunction("foobar", true));
+    procs.put("bar", createProcessFunction("bar", true));
+    procs.put("barfoo", createProcessFunction("barfoo", true));
+
+    Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs);
+    Assert.assertEquals(Sets.newHashSet("foo", "foobar", "bar", "barfoo"), onewayMethods);
+  }
+
+  @Test
+  public void testNoExceptionWrappingForOneway() throws Throwable {
+    final Object[] args = new Object[0];
+
+    final FakeService impl = new FakeServiceImpl();
+
+    // "short" names throw RTEs and are oneway, while long names do not throw exceptions
and are not oneway.
+    RpcServerInvocationHandler<FakeService> handler = RpcWrapper.getInvocationHandler(impl,
Sets.newHashSet("foo", "bar"));
+
+    // Should throw an exception, but not be wrapped because the method is oneway
+    try {
+      handler.invoke(impl, FakeServiceImpl.class.getMethod("foo"), args);
+      Assert.fail("Expected an exception");
+    } catch (RuntimeException e) {
+      Assert.assertEquals(RTE_MESSAGE, e.getMessage());
+    }
+
+    // Should not throw an exception
+    handler.invoke(impl, FakeServiceImpl.class.getMethod("foobar"), args);
+  }
+
+  @Test
+  public void testExceptionWrappingForNonOneway() throws Throwable {
+    final Object[] args = new Object[0];
+
+    final FakeService impl = new FakeServiceImpl();
+
+    // "short" names throw RTEs and are not oneway, while long names do not throw exceptions
and are oneway.
+    RpcServerInvocationHandler<FakeService> handler = RpcWrapper.getInvocationHandler(impl,
Sets.newHashSet("foobar", "barfoo"));
+
+    // Should throw an exception, but not be wrapped because the method is oneway
+    try {
+      handler.invoke(impl, FakeServiceImpl.class.getMethod("foo"), args);
+      Assert.fail("Expected an exception");
+    } catch (TException e) {
+      // The InvocationHandler should take the exception from the RTE and make it a TException
+      Assert.assertEquals(RTE_MESSAGE, e.getMessage());
+    }
+
+    // Should not throw an exception
+    handler.invoke(impl, FakeServiceImpl.class.getMethod("foobar"), args);
+  }
+
+  //
+  // Some hacked together classes/interfaces that mimic what Thrift is doing.
+  //
+
+  /**
+   * Some fake fields for our fake arguments.
+   */
+  private static class fake_fields implements org.apache.thrift.TFieldIdEnum {
+    @Override
+    public short getThriftFieldId() {
+      return 0;
+    }
+
+    @Override
+    public String getFieldName() {
+      return null;
+    }
+  }
+
+  /**
+   * A fake thrift service
+   */
+  interface FakeService {
+    void foo();
+    String foobar();
+    int bar();
+    long barfoo();
+  }
+
+  /**
+   * An implementation of the fake thrift service. The "short" names throw RTEs, while long
names do not.
+   */
+  public static class FakeServiceImpl implements FakeService {
+    @Override
+    public void foo() {
+      throw new RuntimeException(RTE_MESSAGE);
+    }
+
+    @Override
+    public String foobar() {
+      return "";
+    }
+
+    @Override
+    public int bar() {
+      throw new RuntimeException(RTE_MESSAGE);
+    }
+
+    @Override
+    public long barfoo() {
+      return 0;
+    }
+  };
+
+  /**
+   * A fake ProcessFunction implementation for testing that allows injection of method name
and oneway.
+   */
+  private static class fake_proc<I extends FakeService> extends org.apache.thrift.ProcessFunction<I,
foo_args> {
+    final private boolean isOneway;
+
+    public fake_proc(String methodName, boolean isOneway) {
+      super(methodName);
+      this.isOneway = isOneway;
+    }
+
+    @Override
+    protected boolean isOneway() {
+      return isOneway;
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public TBase getResult(I iface, foo_args args) throws TException {
+      return null;
+    }
+
+    @Override
+    public foo_args getEmptyArgsInstance() {
+      return null;
+    }
+  }
+
+  /**
+   * Fake arguments for our fake service.
+   */
+  private static class foo_args implements org.apache.thrift.TBase<foo_args, fake_fields>
{
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public int compareTo(foo_args o) {
+      return 0;
+    }
+
+    @Override
+    public void read(TProtocol iprot) throws TException {
+    }
+
+    @Override
+    public void write(TProtocol oprot) throws TException {
+    }
+
+    @Override
+    public fake_fields fieldForId(int fieldId) {
+      return null;
+    }
+
+    @Override
+    public boolean isSet(fake_fields field) {
+      return false;
+    }
+
+    @Override
+    public Object getFieldValue(fake_fields field) {
+      return null;
+    }
+
+    @Override
+    public void setFieldValue(fake_fields field, Object value) {}
+
+    @Override
+    public TBase<foo_args,fake_fields> deepCopy() {
+      return null;
+    }
+
+    @Override
+    public void clear() {}
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/44b17c63/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 3ab95a7..b4afda8 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -702,7 +702,7 @@ public class SimpleGarbageCollector implements Iface {
   }
 
   private HostAndPort startStatsService() throws UnknownHostException {
-    Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(this));
+    Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(this,
new Processor<Iface>(this).getProcessMapView()));
     int port = config.getPort(Property.GC_PORT);
     long maxMessageSize = config.getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
     HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/44b17c63/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 545d93a..af481c8 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -1032,7 +1032,8 @@ public class Master implements LiveTServerSet.Listener, TableObserver,
CurrentSt
       throw new IOException(e);
     }
 
-    Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(new
MasterClientServiceHandler(this)));
+    Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(new
MasterClientServiceHandler(this),
+        new Processor<Iface>(new MasterClientServiceHandler(this)).getProcessMapView()));
     ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT,
processor, "Master",
         "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK,
Property.GENERAL_MAX_MESSAGE_SIZE);
     clientService = sa.server;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/44b17c63/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 3cffa38..651df66 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3159,7 +3159,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
 
   private HostAndPort startTabletClientService() throws UnknownHostException {
     // start listening for client connection last
-    Iface tch = RpcWrapper.service(new ThriftClientHandler());
+    ThriftClientHandler handler = new ThriftClientHandler();
+    Iface tch = RpcWrapper.service(handler, new Processor<Iface>(handler).getProcessMapView());
     Processor<Iface> processor = new Processor<Iface>(tch);
     HostAndPort address = startServer(getSystemConfiguration(), clientAddress.getHostText(),
Property.TSERV_CLIENTPORT, processor, "Thrift Client Server");
     log.info("address = " + address);


Mime
View raw message