Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DEBD618965 for ; Wed, 16 Sep 2015 01:13:02 +0000 (UTC) Received: (qmail 2530 invoked by uid 500); 16 Sep 2015 01:13:02 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 2489 invoked by uid 500); 16 Sep 2015 01:13:02 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 2480 invoked by uid 99); 16 Sep 2015 01:13:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Sep 2015 01:13:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8828BDFD45; Wed, 16 Sep 2015 01:13:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hitesh@apache.org To: commits@tez.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM. (hitesh) Date: Wed, 16 Sep 2015 01:13:02 +0000 (UTC) Repository: tez Updated Branches: refs/heads/branch-0.7 f96d12ae5 -> d382117a7 TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM. (hitesh) (cherry picked from commit 28cd991b86c0e216e80f9246d8c0bddaa5b0f97c) Conflicts: CHANGES.txt tez-api/src/main/java/org/apache/tez/client/TezClient.java Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d382117a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d382117a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d382117a Branch: refs/heads/branch-0.7 Commit: d382117a793c75ae6bad9b92efbe063c1269c45c Parents: f96d12a Author: Hitesh Shah Authored: Thu Aug 6 11:08:30 2015 -0700 Committer: Hitesh Shah Committed: Tue Sep 15 17:01:46 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/client/TezClient.java | 5 +- .../java/org/apache/tez/common/RPCUtil.java | 173 ++++++++++++++++++ .../tez/dag/api/client/DAGClientImpl.java | 4 + .../dag/api/client/rpc/DAGClientRPCImpl.java | 25 +-- .../java/org/apache/tez/common/TestRPCUtil.java | 181 +++++++++++++++++++ .../org/apache/tez/test/TestAMRecovery.java | 3 +- 7 files changed, 377 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d382117a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6adb38e..3e6df02 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM. TEZ-2825. Report progress in terms of completed tasks to reduce load on AM for Tez UI TEZ-2812. Tez UI: Update task & attempt tables while in progress. TEZ-2786. Tez UI: Update vertex, task & attempt details page while in progress. http://git-wip-us.apache.org/repos/asf/tez/blob/d382117a/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 0710cdd..da31141 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -25,7 +25,6 @@ import java.util.Map; import javax.annotation.Nullable; -import org.apache.tez.common.JavaOptsChecker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -43,7 +42,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.tez.common.counters.Limits; +import org.apache.tez.common.JavaOptsChecker; import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.common.RPCUtil; import org.apache.tez.common.security.HistoryACLPolicyManager; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.DAG; @@ -501,7 +502,7 @@ public class TezClient { dagId = response.getDagId(); } } catch (ServiceException e) { - throw new TezException(e); + RPCUtil.unwrapAndThrowException(e); } LOG.info("Submitted dag to TezSession" + ", sessionName=" + clientName http://git-wip-us.apache.org/repos/asf/tez/blob/d382117a/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java b/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java new file mode 100644 index 0000000..caeb822 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java @@ -0,0 +1,173 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.common; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +import org.apache.hadoop.ipc.RemoteException; +import org.apache.tez.dag.api.DAGNotRunningException; +import org.apache.tez.dag.api.SessionNotRunning; +import org.apache.tez.dag.api.TezException; + +import com.google.protobuf.ServiceException; + +public class RPCUtil { + + /** + * Returns an instance of {@link TezException} + */ + public static TezException getRemoteException(Throwable t) { + return new TezException(t); + } + + /** + * Returns an instance of {@link TezException} + */ + public static TezException getRemoteException(String message) { + return new TezException(message); + } + + private static T instantiateException( + Class cls, RemoteException re) throws RemoteException { + try { + Constructor cn = cls.getConstructor(String.class); + cn.setAccessible(true); + T ex = cn.newInstance(re.getMessage()); + ex.initCause(re); + return ex; + // RemoteException contains useful information as against the + // java.lang.reflect exceptions. + } catch (NoSuchMethodException e) { + throw re; + } catch (IllegalArgumentException e) { + throw re; + } catch (SecurityException e) { + throw re; + } catch (InstantiationException e) { + throw re; + } catch (IllegalAccessException e) { + throw re; + } catch (InvocationTargetException e) { + throw re; + } + } + + private static T instantiateTezException( + Class cls, RemoteException re) throws RemoteException { + return instantiateException(cls, re); + } + + private static T instantiateIOException( + Class cls, RemoteException re) throws RemoteException { + return instantiateException(cls, re); + } + + private static T instantiateRuntimeException( + Class cls, RemoteException re) throws RemoteException { + return instantiateException(cls, re); + } + + private static T instantiateSessionNotRunningException( + Class cls, RemoteException re) throws RemoteException { + return instantiateException(cls, re); + } + + + /** + * Utility method that unwraps and returns appropriate exceptions. + * + * @param se + * ServiceException + * @return An instance of the actual exception, which will be a subclass of + * {@link TezException} or {@link IOException} + */ + public static Void unwrapAndThrowException(ServiceException se) + throws IOException, TezException { + + Throwable cause = se.getCause(); + if (cause == null) { + // SE generated by the RPC layer itself. + throw new IOException(se); + } else { + if (cause instanceof RemoteException) { + RemoteException re = (RemoteException) cause; + Class realClass = null; + try { + realClass = Class.forName(re.getClassName()); + } catch (ClassNotFoundException cnf) { + // Assume this to be a new exception type added to Tez. This isn't + // absolutely correct since the RPC layer could add an exception as + // well. + throw instantiateTezException(TezException.class, re); + } + + if (SessionNotRunning.class.isAssignableFrom(realClass)) { + throw instantiateTezException( + realClass.asSubclass(SessionNotRunning.class), re); + } else if (DAGNotRunningException.class.isAssignableFrom(realClass)) { + throw instantiateTezException( + realClass.asSubclass(DAGNotRunningException.class), re); + } else if (TezException.class.isAssignableFrom(realClass)) { + throw instantiateTezException( + realClass.asSubclass(TezException.class), re); + } else if (IOException.class.isAssignableFrom(realClass)) { + throw instantiateIOException(realClass.asSubclass(IOException.class), + re); + } else if (RuntimeException.class.isAssignableFrom(realClass)) { + throw instantiateRuntimeException( + realClass.asSubclass(RuntimeException.class), re); + } else { + throw re; + } + // RemoteException contains useful information as against the + // java.lang.reflect exceptions. + + } else if (cause instanceof IOException) { + // RPC Client exception. + throw (IOException) cause; + } else if (cause instanceof RuntimeException) { + // RPC RuntimeException + throw (RuntimeException) cause; + } else { + // Should not be generated. + throw new IOException(se); + } + } + } + + /** + * Utility method that unwraps and returns appropriate exceptions. + * + * @param se + * ServiceException + * @return An instance of the actual exception, which will be a subclass of + * {@link TezException} or {@link IOException} + */ + public static Void unwrapAndThrowNonIOException(ServiceException se) + throws TezException { + try { + return unwrapAndThrowException(se); + } catch (IOException ioe) { + throw new TezException(ioe); + } + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/d382117a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java index 4e2ff40..47c8a8e 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java @@ -353,6 +353,8 @@ public class DAGClientImpl extends DAGClient { dagCompleted = true; } catch (TezException e) { // can be either due to a n/w issue of due to AM completed. + } catch (IOException e) { + // can be either due to a n/w issue of due to AM completed. } if (dagStatus == null && !dagCompleted) { @@ -371,6 +373,8 @@ public class DAGClientImpl extends DAGClient { dagCompleted = true; } catch (TezException e) { // can be either due to a n/w issue of due to AM completed. + } catch (IOException e) { + // can be either due to a n/w issue of due to AM completed. } if (vertexStatus == null && !dagCompleted) { http://git-wip-us.apache.org/repos/asf/tez/blob/d382117a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java index 223c0ab..240289c 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java @@ -23,11 +23,11 @@ import java.util.Set; import javax.annotation.Nullable; +import org.apache.tez.common.RPCUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -52,6 +52,7 @@ import com.google.protobuf.ServiceException; @Private public class DAGClientRPCImpl extends DAGClient { + private static final Logger LOG = LoggerFactory.getLogger(DAGClientRPCImpl.class); private static final String DAG_NOT_RUNNING_CLASS_NAME = @@ -96,6 +97,9 @@ public class DAGClientRPCImpl extends DAGClient { } catch (TezException e) { resetProxy(e); // create proxy again throw e; + } catch (IOException e) { + resetProxy(e); // create proxy again + throw e; } } @@ -113,6 +117,9 @@ public class DAGClientRPCImpl extends DAGClient { } catch (TezException e) { resetProxy(e); // create proxy again throw e; + } catch (IOException e) { + resetProxy(e); // create proxy again + throw e; } } @@ -176,22 +183,15 @@ public class DAGClientRPCImpl extends DAGClient { proxy.getDAGStatus(null, requestProtoBuilder.build()).getDagStatus(), DagStatusSource.AM); } catch (ServiceException e) { - final Throwable cause = e.getCause(); - if (cause instanceof RemoteException) { - RemoteException remoteException = (RemoteException) cause; - if (DAG_NOT_RUNNING_CLASS_NAME.equals(remoteException.getClassName())) { - throw new DAGNotRunningException(remoteException.getMessage()); - } - } - - // TEZ-151 retrieve wrapped TezException + RPCUtil.unwrapAndThrowException(e); + // Should not reach here throw new TezException(e); } } VertexStatus getVertexStatusViaAM(String vertexName, Set statusOptions) - throws TezException { + throws TezException, IOException { if (LOG.isDebugEnabled()) { LOG.debug("GetVertexStatus via AM for app: " + appId + " dag: " + dagId + " vertex: " + vertexName); @@ -211,7 +211,8 @@ public class DAGClientRPCImpl extends DAGClient { proxy.getVertexStatus(null, requestProtoBuilder.build()).getVertexStatus()); } catch (ServiceException e) { - // TEZ-151 retrieve wrapped TezException + RPCUtil.unwrapAndThrowException(e); + // Should not reach here throw new TezException(e); } } http://git-wip-us.apache.org/repos/asf/tez/blob/d382117a/tez-api/src/test/java/org/apache/tez/common/TestRPCUtil.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/common/TestRPCUtil.java b/tez-api/src/test/java/org/apache/tez/common/TestRPCUtil.java new file mode 100644 index 0000000..1e63b47 --- /dev/null +++ b/tez-api/src/test/java/org/apache/tez/common/TestRPCUtil.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.tez.dag.api.SessionNotRunning; +import org.apache.tez.dag.api.TezException; +import org.junit.Assert; + +import org.apache.hadoop.ipc.RemoteException; +import org.junit.Test; + +import com.google.protobuf.ServiceException; + +public class TestRPCUtil { + + @Test (timeout=1000) + public void testUnknownExceptionUnwrapping() { + Class exception = TezException.class; + String className = "UnknownException.class"; + verifyRemoteExceptionUnwrapping(exception, className); + } + + @Test + public void testRemoteIOExceptionUnwrapping() { + Class exception = IOException.class; + verifyRemoteExceptionUnwrapping(exception, exception.getName()); + } + + @Test + public void testRemoteIOExceptionDerivativeUnwrapping() { + // Test IOException sub-class + Class exception = FileNotFoundException.class; + verifyRemoteExceptionUnwrapping(exception, exception.getName()); + } + + @Test + public void testRemoteTezExceptionUnwrapping() { + Class exception = TezException.class; + verifyRemoteExceptionUnwrapping(exception, exception.getName()); + + } + + @Test + public void testRemoteTezExceptionDerivativeUnwrapping() { + Class exception = SessionNotRunning.class; + verifyRemoteExceptionUnwrapping(exception, exception.getName()); + } + + @Test + public void testRemoteRuntimeExceptionUnwrapping() { + Class exception = NullPointerException.class; + verifyRemoteExceptionUnwrapping(exception, exception.getName()); + } + + @Test + public void testUnexpectedRemoteExceptionUnwrapping() { + // Non IOException, TezException thrown by the remote side. + Class exception = Exception.class; + verifyRemoteExceptionUnwrapping(RemoteException.class, exception.getName()); + } + + @Test + public void testRemoteTezExceptionWithoutStringConstructor() { + // Derivatives of TezException should always define a string constructor. + Class exception = TezTestExceptionNoConstructor.class; + verifyRemoteExceptionUnwrapping(RemoteException.class, exception.getName()); + } + + @Test + public void testRPCServiceExceptionUnwrapping() { + String message = "ServiceExceptionMessage"; + ServiceException se = new ServiceException(message); + + Throwable t = null; + try { + RPCUtil.unwrapAndThrowException(se); + } catch (Throwable thrown) { + t = thrown; + } + + Assert.assertTrue(IOException.class.isInstance(t)); + Assert.assertTrue(t.getMessage().contains(message)); + } + + @Test + public void testRPCIOExceptionUnwrapping() { + String message = "DirectIOExceptionMessage"; + IOException ioException = new FileNotFoundException(message); + ServiceException se = new ServiceException(ioException); + + Throwable t = null; + try { + RPCUtil.unwrapAndThrowException(se); + } catch (Throwable thrown) { + t = thrown; + } + Assert.assertTrue(FileNotFoundException.class.isInstance(t)); + Assert.assertTrue(t.getMessage().contains(message)); + } + + @Test + public void testRPCRuntimeExceptionUnwrapping() { + String message = "RPCRuntimeExceptionUnwrapping"; + RuntimeException re = new NullPointerException(message); + ServiceException se = new ServiceException(re); + + Throwable t = null; + try { + RPCUtil.unwrapAndThrowException(se); + } catch (Throwable thrown) { + t = thrown; + } + + Assert.assertTrue(NullPointerException.class.isInstance(t)); + Assert.assertTrue(t.getMessage().contains(message)); + } + + private void verifyRemoteExceptionUnwrapping( + Class expectedLocalException, + String realExceptionClassName) { + verifyRemoteExceptionUnwrapping(expectedLocalException, realExceptionClassName, true); + } + + private void verifyRemoteExceptionUnwrapping( + Class expectedLocalException, + String realExceptionClassName, boolean allowIO) { + String message = realExceptionClassName + "Message"; + RemoteException re = new RemoteException(realExceptionClassName, message); + ServiceException se = new ServiceException(re); + + Throwable t = null; + try { + if (allowIO) { + RPCUtil.unwrapAndThrowException(se); + } else { + RPCUtil.unwrapAndThrowNonIOException(se); + } + } catch (Throwable thrown) { + t = thrown; + } + + Assert.assertTrue("Expected exception [" + expectedLocalException + + "] but found " + t, expectedLocalException.isInstance(t)); + Assert.assertTrue( + "Expected message [" + message + "] but found " + t.getMessage(), t + .getMessage().contains(message)); + } + + + @Test (timeout=1000) + public void testRemoteNonIOExceptionUnwrapping() { + Class exception = TezException.class; + verifyRemoteExceptionUnwrapping(exception, IOException.class.getName(), false); + } + + + private static class TezTestExceptionNoConstructor extends + Exception { + private static final long serialVersionUID = 1L; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/d382117a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java index 1d17b23..7d7069e 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java @@ -235,6 +235,7 @@ public class TestAMRecovery { createDAG("VertexCompletelyFinished_Broadcast", ControlledImmediateStartVertexManager.class, DataMovementType.BROADCAST, false); TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED); + assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue()); assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue()); @@ -483,7 +484,7 @@ public class TestAMRecovery { "application", "dag") + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX); if (fs.exists(recoveryFilePath)) { - LOG.info("read recovery file:" + recoveryFilePath); + LOG.info("Read recovery file:" + recoveryFilePath); historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(fs.open(recoveryFilePath))); } }