Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C2D6D18E3C for ; Wed, 2 Dec 2015 02:38:18 +0000 (UTC) Received: (qmail 63256 invoked by uid 500); 2 Dec 2015 02:38:18 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 63132 invoked by uid 500); 2 Dec 2015 02:38:18 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 63011 invoked by uid 99); 2 Dec 2015 02:38:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Dec 2015 02:38:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 53C91E10BB; Wed, 2 Dec 2015 02:38:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Wed, 02 Dec 2015 02:38:20 -0000 Message-Id: In-Reply-To: <041368f192fa4205a1a81fc8f5612253@git.apache.org> References: <041368f192fa4205a1a81fc8f5612253@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/23] accumulo git commit: ACCUMULO-4065 Work around TExceptions being written back to clients in oneway methods. ACCUMULO-4065 Work around TExceptions being written back to clients in oneway methods. Oneway methods must not throw a TException, otherwise Thrift sends a message back to the client which causes problems for the client which next uses that connection (and finds a message still sitting on the "wire"). Closes apache/accumulo#56 Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/44b17c63 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/44b17c63 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/44b17c63 Branch: refs/heads/master Commit: 44b17c639f4e5b617196f26a8f1ff56ce475221b Parents: e82c3f3 Author: Josh Elser Authored: Tue Dec 1 15:54:50 2015 -0500 Committer: Josh Elser Committed: Tue Dec 1 15:54:50 2015 -0500 ---------------------------------------------------------------------- .../java/org/apache/accumulo/proxy/Proxy.java | 4 +- .../apache/accumulo/server/util/RpcWrapper.java | 74 +++++- .../accumulo/server/util/RpcWrapperTest.java | 261 +++++++++++++++++++ .../accumulo/gc/SimpleGarbageCollector.java | 2 +- .../java/org/apache/accumulo/master/Master.java | 3 +- .../apache/accumulo/tserver/TabletServer.java | 3 +- 6 files changed, 337 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/44b17c63/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java ---------------------------------------------------------------------- diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java index f3ec13b..3368d20 100644 --- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java +++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java @@ -29,6 +29,7 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.accumulo.proxy.thrift.AccumuloProxy; import org.apache.accumulo.server.util.RpcWrapper; import org.apache.log4j.Logger; +import org.apache.thrift.TBaseProcessor; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocolFactory; @@ -135,7 +136,8 @@ public class Proxy { @SuppressWarnings("unchecked") Constructor proxyProcConstructor = (Constructor) proxyProcClass.getConstructor(proxyIfaceClass); - final TProcessor processor = proxyProcConstructor.newInstance(RpcWrapper.service(impl)); + @SuppressWarnings({"rawtypes", "unchecked"}) + final TProcessor processor = proxyProcConstructor.newInstance(RpcWrapper.service(impl, ((TBaseProcessor) proxyProcConstructor.newInstance(impl)).getProcessMapView())); THsHaServer.Args args = new THsHaServer.Args(socket); args.processor(processor); http://git-wip-us.apache.org/repos/asf/accumulo/blob/44b17c63/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java index 1f30b07..aafe37c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/RpcWrapper.java @@ -19,10 +19,16 @@ package org.apache.accumulo.server.util; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import org.apache.accumulo.trace.instrument.thrift.RpcServerInvocationHandler; import org.apache.accumulo.trace.instrument.thrift.TraceWrap; +import org.apache.thrift.ProcessFunction; import org.apache.thrift.TApplicationException; +import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,33 +39,89 @@ import org.slf4j.LoggerFactory; * a network issue, but informs the client that a {@link TApplicationException} had occurred, as it did in Thrift 0.9.0. This performs similar functions as * {@link TraceWrap}, but with the additional action of translating exceptions. See also ACCUMULO-1691 and ACCUMULO-2950. * + * ACCUMULO-4065 found that the above exception-wrapping is not appropriate for Thrift's implementation of oneway methods. Oneway methods are defined as + * a method which the client does not wait for it to return. Normally, this is acceptable as these methods are void. Therefore, if another client reuses + * the connection to send a new RPC, there is no "extra" data sitting on the InputStream from the Socket (that the server sent). However, the implementation + * of a oneway method does send a response to the client when the implementation throws a {@link TException}. This message showing up on the client's + * InputStream causes future use of the Thrift Connection to become unusable. As long as the Thrift implementation sends a message back when oneway methods + * throw a {@link TException}, we much make sure that we don't re-wrap-and-throw any exceptions as {@link TException}s. + * * @since 1.6.1 */ public class RpcWrapper { + private static final Logger log = LoggerFactory.getLogger(RpcWrapper.class); + + public static T service(final T instance, @SuppressWarnings("rawtypes") final Map> processorView) { + final Set onewayMethods = getOnewayMethods(processorView); + log.debug("Found oneway Thrift methods: " + onewayMethods); + + InvocationHandler handler = getInvocationHandler(instance, onewayMethods); + + @SuppressWarnings("unchecked") + T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler); + return proxiedInstance; + } - public static T service(final T instance) { - InvocationHandler handler = new RpcServerInvocationHandler(instance) { + protected static RpcServerInvocationHandler getInvocationHandler(final T instance, final Set onewayMethods) { + return new RpcServerInvocationHandler(instance) { private final Logger log = LoggerFactory.getLogger(instance.getClass()); @Override public Object invoke(Object obj, Method method, Object[] args) throws Throwable { + // e.g. ThriftClientHandler.flush(TInfo, TCredentials, ...) try { return super.invoke(obj, method, args); } catch (RuntimeException e) { String msg = e.getMessage(); log.error(msg, e); + if (onewayMethods.contains(method.getName())) { + throw e; + } throw new TException(msg); } catch (Error e) { String msg = e.getMessage(); log.error(msg, e); + if (onewayMethods.contains(method.getName())) { + throw e; + } throw new TException(msg); } } }; - - @SuppressWarnings("unchecked") - T proxiedInstance = (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler); - return proxiedInstance; } + protected static Set getOnewayMethods(@SuppressWarnings("rawtypes") Map> processorView) { + // Get a handle on the isOnewayMethod and make it accessible + final Method isOnewayMethod; + try { + isOnewayMethod = ProcessFunction.class.getDeclaredMethod("isOneway"); + } catch (NoSuchMethodException e) { + throw new RuntimeException("Could not access isOneway method", e); + } catch (SecurityException e) { + throw new RuntimeException("Could not access isOneway method", e); + } + // In java7, this appears to be copying the method, but it's trivial for us to return the object to how it was before. + final boolean accessible = isOnewayMethod.isAccessible(); + isOnewayMethod.setAccessible(true); + + try { + final Set onewayMethods = new HashSet(); + for (@SuppressWarnings("rawtypes") Entry> entry : processorView.entrySet()) { + try { + if ((Boolean) isOnewayMethod.invoke(entry.getValue())) { + onewayMethods.add(entry.getKey()); + } + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + return onewayMethods; + } finally { + // Reset it back to how it was. + isOnewayMethod.setAccessible(accessible); + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/44b17c63/server/base/src/test/java/org/apache/accumulo/server/util/RpcWrapperTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/RpcWrapperTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/RpcWrapperTest.java new file mode 100644 index 0000000..63a836f --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/util/RpcWrapperTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.trace.instrument.thrift.RpcServerInvocationHandler; +import org.apache.thrift.ProcessFunction; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TProtocol; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Verification that RpcWrapper correctly mangles Exceptions to work around Thrift. + */ +public class RpcWrapperTest { + + private static final String RTE_MESSAGE = "RpcWrapperTest's RuntimeException Message"; + /** + * Given a method name and whether or not the method is oneway, construct a ProcessFunction. + * + * @param methodName The service method name. + * @param isOneway Is the method oneway. + * @return A ProcessFunction. + */ + private fake_proc createProcessFunction(String methodName, boolean isOneway) { + return new fake_proc(methodName, isOneway); + } + + @Test + public void testSomeOnewayMethods() { + @SuppressWarnings("rawtypes") + Map> procs = new HashMap>(); + procs.put("foo", createProcessFunction("foo", true)); + procs.put("foobar", createProcessFunction("foobar", false)); + procs.put("bar", createProcessFunction("bar", true)); + procs.put("barfoo", createProcessFunction("barfoo", false)); + + Set onewayMethods = RpcWrapper.getOnewayMethods(procs); + Assert.assertEquals(Sets.newHashSet("foo", "bar"), onewayMethods); + } + + @Test + public void testNoOnewayMethods() { + @SuppressWarnings("rawtypes") + Map> procs = new HashMap>(); + procs.put("foo", createProcessFunction("foo", false)); + procs.put("foobar", createProcessFunction("foobar", false)); + procs.put("bar", createProcessFunction("bar", false)); + procs.put("barfoo", createProcessFunction("barfoo", false)); + + Set onewayMethods = RpcWrapper.getOnewayMethods(procs); + Assert.assertEquals(Collections. emptySet(), onewayMethods); + } + + @Test + public void testAllOnewayMethods() { + @SuppressWarnings("rawtypes") + Map> procs = new HashMap>(); + procs.put("foo", createProcessFunction("foo", true)); + procs.put("foobar", createProcessFunction("foobar", true)); + procs.put("bar", createProcessFunction("bar", true)); + procs.put("barfoo", createProcessFunction("barfoo", true)); + + Set onewayMethods = RpcWrapper.getOnewayMethods(procs); + Assert.assertEquals(Sets.newHashSet("foo", "foobar", "bar", "barfoo"), onewayMethods); + } + + @Test + public void testNoExceptionWrappingForOneway() throws Throwable { + final Object[] args = new Object[0]; + + final FakeService impl = new FakeServiceImpl(); + + // "short" names throw RTEs and are oneway, while long names do not throw exceptions and are not oneway. + RpcServerInvocationHandler handler = RpcWrapper.getInvocationHandler(impl, Sets.newHashSet("foo", "bar")); + + // Should throw an exception, but not be wrapped because the method is oneway + try { + handler.invoke(impl, FakeServiceImpl.class.getMethod("foo"), args); + Assert.fail("Expected an exception"); + } catch (RuntimeException e) { + Assert.assertEquals(RTE_MESSAGE, e.getMessage()); + } + + // Should not throw an exception + handler.invoke(impl, FakeServiceImpl.class.getMethod("foobar"), args); + } + + @Test + public void testExceptionWrappingForNonOneway() throws Throwable { + final Object[] args = new Object[0]; + + final FakeService impl = new FakeServiceImpl(); + + // "short" names throw RTEs and are not oneway, while long names do not throw exceptions and are oneway. + RpcServerInvocationHandler handler = RpcWrapper.getInvocationHandler(impl, Sets.newHashSet("foobar", "barfoo")); + + // Should throw an exception, but not be wrapped because the method is oneway + try { + handler.invoke(impl, FakeServiceImpl.class.getMethod("foo"), args); + Assert.fail("Expected an exception"); + } catch (TException e) { + // The InvocationHandler should take the exception from the RTE and make it a TException + Assert.assertEquals(RTE_MESSAGE, e.getMessage()); + } + + // Should not throw an exception + handler.invoke(impl, FakeServiceImpl.class.getMethod("foobar"), args); + } + + // + // Some hacked together classes/interfaces that mimic what Thrift is doing. + // + + /** + * Some fake fields for our fake arguments. + */ + private static class fake_fields implements org.apache.thrift.TFieldIdEnum { + @Override + public short getThriftFieldId() { + return 0; + } + + @Override + public String getFieldName() { + return null; + } + } + + /** + * A fake thrift service + */ + interface FakeService { + void foo(); + String foobar(); + int bar(); + long barfoo(); + } + + /** + * An implementation of the fake thrift service. The "short" names throw RTEs, while long names do not. + */ + public static class FakeServiceImpl implements FakeService { + @Override + public void foo() { + throw new RuntimeException(RTE_MESSAGE); + } + + @Override + public String foobar() { + return ""; + } + + @Override + public int bar() { + throw new RuntimeException(RTE_MESSAGE); + } + + @Override + public long barfoo() { + return 0; + } + }; + + /** + * A fake ProcessFunction implementation for testing that allows injection of method name and oneway. + */ + private static class fake_proc extends org.apache.thrift.ProcessFunction { + final private boolean isOneway; + + public fake_proc(String methodName, boolean isOneway) { + super(methodName); + this.isOneway = isOneway; + } + + @Override + protected boolean isOneway() { + return isOneway; + } + + @SuppressWarnings("rawtypes") + @Override + public TBase getResult(I iface, foo_args args) throws TException { + return null; + } + + @Override + public foo_args getEmptyArgsInstance() { + return null; + } + } + + /** + * Fake arguments for our fake service. + */ + private static class foo_args implements org.apache.thrift.TBase { + + private static final long serialVersionUID = 1L; + + @Override + public int compareTo(foo_args o) { + return 0; + } + + @Override + public void read(TProtocol iprot) throws TException { + } + + @Override + public void write(TProtocol oprot) throws TException { + } + + @Override + public fake_fields fieldForId(int fieldId) { + return null; + } + + @Override + public boolean isSet(fake_fields field) { + return false; + } + + @Override + public Object getFieldValue(fake_fields field) { + return null; + } + + @Override + public void setFieldValue(fake_fields field, Object value) {} + + @Override + public TBase deepCopy() { + return null; + } + + @Override + public void clear() {} + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/44b17c63/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 3ab95a7..b4afda8 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -702,7 +702,7 @@ public class SimpleGarbageCollector implements Iface { } private HostAndPort startStatsService() throws UnknownHostException { - Processor processor = new Processor(RpcWrapper.service(this)); + Processor processor = new Processor(RpcWrapper.service(this, new Processor(this).getProcessMapView())); int port = config.getPort(Property.GC_PORT); long maxMessageSize = config.getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE); HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port); http://git-wip-us.apache.org/repos/asf/accumulo/blob/44b17c63/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 545d93a..af481c8 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -1032,7 +1032,8 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt throw new IOException(e); } - Processor processor = new Processor(RpcWrapper.service(new MasterClientServiceHandler(this))); + Processor processor = new Processor(RpcWrapper.service(new MasterClientServiceHandler(this), + new Processor(new MasterClientServiceHandler(this)).getProcessMapView())); ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); clientService = sa.server; http://git-wip-us.apache.org/repos/asf/accumulo/blob/44b17c63/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 3cffa38..651df66 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -3159,7 +3159,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu private HostAndPort startTabletClientService() throws UnknownHostException { // start listening for client connection last - Iface tch = RpcWrapper.service(new ThriftClientHandler()); + ThriftClientHandler handler = new ThriftClientHandler(); + Iface tch = RpcWrapper.service(handler, new Processor(handler).getProcessMapView()); Processor processor = new Processor(tch); HostAndPort address = startServer(getSystemConfiguration(), clientAddress.getHostText(), Property.TSERV_CLIENTPORT, processor, "Thrift Client Server"); log.info("address = " + address);