Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 97C0D17796 for ; Fri, 22 Jan 2016 07:19:28 +0000 (UTC) Received: (qmail 15905 invoked by uid 500); 22 Jan 2016 07:19:28 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 15873 invoked by uid 500); 22 Jan 2016 07:19:28 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 15864 invoked by uid 99); 22 Jan 2016 07:19:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Jan 2016 07:19:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 824CDE0098; Fri, 22 Jan 2016 07:19:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jhkim@apache.org To: commits@tajo.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: tajo git commit: TAJO-2048: QueryMaster and TajoWorker should support the exception propagation. Date: Fri, 22 Jan 2016 07:19:27 +0000 (UTC) Repository: tajo Updated Branches: refs/heads/branch-0.11.1 02c1f1932 -> 031cf3205 TAJO-2048: QueryMaster and TajoWorker should support the exception propagation. Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/031cf320 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/031cf320 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/031cf320 Branch: refs/heads/branch-0.11.1 Commit: 031cf3205c135e43a8e944db19a50797138c26c4 Parents: 02c1f19 Author: Jinho Kim Authored: Fri Jan 22 16:18:39 2016 +0900 Committer: Jinho Kim Committed: Fri Jan 22 16:18:39 2016 +0900 ---------------------------------------------------------------------- CHANGES | 3 ++ .../org/apache/tajo/exception/ErrorUtil.java | 25 ++++++++++++- .../apache/tajo/exception/ReturnStateUtil.java | 14 ++++++- .../tajo/cli/tsql/TestTajoCliNegatives.java | 2 +- .../apache/tajo/worker/MockExecutionBlock.java | 2 +- .../physical/RangeShuffleFileWriteExec.java | 3 +- .../java/org/apache/tajo/master/QueryInfo.java | 2 + .../org/apache/tajo/master/QueryManager.java | 5 ++- .../tajo/master/TajoMasterClientService.java | 12 +++--- .../tajo/master/event/StageTaskFailedEvent.java | 39 ++++++++++++++++++++ .../tajo/master/event/TaskFatalErrorEvent.java | 17 +++++---- .../master/event/TaskTAttemptFailedEvent.java | 36 ++++++++++++++++++ .../tajo/querymaster/DefaultTaskScheduler.java | 6 +-- .../java/org/apache/tajo/querymaster/Query.java | 22 ++++++++--- .../apache/tajo/querymaster/QueryMaster.java | 8 +++- .../tajo/querymaster/QueryMasterTask.java | 5 ++- .../java/org/apache/tajo/querymaster/Stage.java | 31 ++++++++++++---- .../java/org/apache/tajo/querymaster/Task.java | 17 ++++----- .../apache/tajo/querymaster/TaskAttempt.java | 8 ++-- .../tajo/worker/ExecutionBlockContext.java | 11 +++--- .../org/apache/tajo/worker/TaskContainer.java | 2 +- .../org/apache/tajo/worker/TaskExecutor.java | 3 +- .../java/org/apache/tajo/worker/TaskImpl.java | 16 ++------ tajo-core/src/main/proto/ResourceProtos.proto | 11 +++--- 24 files changed, 220 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 79fdc38..3dab1cb 100644 --- a/CHANGES +++ b/CHANGES @@ -7,6 +7,9 @@ Release 0.11.1 - unreleased IMPROVEMENT + TAJO-2048: QueryMaster and TajoWorker should support the exception + propagation. (jinho) + TAJO-2050: Adopt TAJO logo in CLI. (Dongkyu Hwangbo via jaehwa) http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java index 025a20c..9a71bd6 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java @@ -18,6 +18,8 @@ package org.apache.tajo.exception; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tajo.error.Errors; import org.apache.tajo.error.Errors.ResultCode; import org.apache.tajo.error.Stacktrace; @@ -32,13 +34,32 @@ public class ErrorUtil { public static Stacktrace.StackTrace convertStacktrace(Throwable t) { Stacktrace.StackTrace.Builder builder = Stacktrace.StackTrace.newBuilder(); - for (StackTraceElement element: t.getStackTrace()) { + for (StackTraceElement element : t.getStackTrace()) { builder.addElement(Stacktrace.StackTrace.Element.newBuilder() - .setFilename(element.getFileName()) + .setFilename(element.getFileName() == null ? "(Unknown Source)" : element.getFileName()) .setFunction(element.getClassName() + "::" + element.getMethodName()) .setLine(element.getLineNumber()) ); } return builder.build(); } + + public static Errors.SerializedException convertException(Throwable t) { + Errors.SerializedException.Builder builder = Errors.SerializedException.newBuilder(); + + if (ExceptionUtil.isExceptionWithResultCode(t)) { + DefaultTajoException tajoException = (DefaultTajoException) t; + builder.setReturnCode(tajoException.getErrorCode()); + builder.setMessage(tajoException.getMessage()); + } else { + Throwable rootCause = ExceptionUtils.getRootCause(t); + if(rootCause != null) t = rootCause; + + builder.setReturnCode(ResultCode.INTERNAL_ERROR); + builder.setMessage(ErrorMessages.getInternalErrorMessage(t)); + } + builder.setStackTrace(ErrorUtil.convertStacktrace(t)); + builder.setTimestamp(System.currentTimeMillis()); + return builder.build(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java index 01845f4..833af63 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java @@ -20,6 +20,7 @@ package org.apache.tajo.exception; import com.google.common.base.Preconditions; import org.apache.tajo.QueryId; +import org.apache.tajo.error.Errors; import org.apache.tajo.error.Errors.ResultCode; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListResponse; @@ -83,9 +84,20 @@ public class ReturnStateUtil { } else { builder.setReturnCode(ResultCode.INTERNAL_ERROR); builder.setMessage(ErrorMessages.getInternalErrorMessage(t)); - builder.setStackTrace(ErrorUtil.convertStacktrace(t)); } + builder.setStackTrace(ErrorUtil.convertStacktrace(t)); + return builder.build(); + } + + public static ReturnState returnError(Errors.SerializedException e) { + ReturnState.Builder builder = ReturnState.newBuilder(); + + builder.setReturnCode(e.getReturnCode()); + builder.setMessage(e.getMessage()); + if (e.hasStackTrace()) { + builder.setStackTrace(e.getStackTrace()); + } return builder.build(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java index bf4ffcf..3297550 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java @@ -141,6 +141,6 @@ public class TestTajoCliNegatives extends QueryTestCaseBase { public void testQueryFailure() throws Exception { setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName()); assertScriptFailure("select fail(3, l_orderkey, 'testQueryFailure') from default.lineitem where l_orderkey > 0" , - "ERROR: Internal error. Please check out log files in ${tajo_install_dir}/logs directory.\n"); + "ERROR: internal error: testQueryFailure\n"); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java index 7d7fb1a..cbc4312 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java @@ -36,7 +36,7 @@ public class MockExecutionBlock extends ExecutionBlockContext { } @Override - public void fatalError(TaskAttemptId taskAttemptId, String message) { + public void fatalError(TaskAttemptId taskAttemptId, Throwable throwable) { } } http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java index e4217b3..776a783 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java @@ -28,7 +28,6 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.KeyProjector; import org.apache.tajo.plan.logical.ShuffleFileWriteNode; import org.apache.tajo.plan.util.PlannerUtil; @@ -74,7 +73,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs); keyProjector = new KeyProjector(inSchema, keySchema.toArray()); - BSTIndex bst = new BSTIndex(new TajoConf()); + BSTIndex bst = new BSTIndex(context.getConf()); this.comp = new BaseTupleComparator(keySchema, sortSpecs); Path storeTablePath = new Path(context.getWorkDir(), "output"); LOG.info("Output data directory: " + storeTablePath); http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java index 38e9403..9508153 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java @@ -44,6 +44,8 @@ public class QueryInfo implements GsonObject, History, Comparable { private volatile long startTime; @Expose private volatile long finishTime; + + @Deprecated @Expose private String lastMessage; @Expose http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java index ba421bd..95ecc80 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java @@ -346,7 +346,10 @@ public class QueryManager extends CompositeService { queryInfo.setQueryMaster(connectionInfo.getHost()); queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort()); queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort()); - queryInfo.setLastMessage(queryHeartbeat.getStatusMessage()); + if(queryHeartbeat.hasError()) { + //TODO set error instead of last message + queryInfo.setLastMessage(queryHeartbeat.getError().getMessage()); + } queryInfo.setQueryState(queryHeartbeat.getState()); queryInfo.setProgress(queryHeartbeat.getQueryProgress()); http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 59059c8..63d3bdb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -38,10 +38,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.exception.QueryNotFoundException; -import org.apache.tajo.exception.ReturnStateUtil; -import org.apache.tajo.exception.UnavailableTableLocationException; -import org.apache.tajo.exception.UndefinedDatabaseException; +import org.apache.tajo.exception.*; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.ipc.TajoMasterClientProtocol; @@ -60,6 +57,7 @@ import org.apache.tajo.session.Session; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; +import org.apache.tajo.util.StringUtils; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -111,7 +109,7 @@ public class TajoMasterClientService extends AbstractService { @Override public void serviceStop() throws Exception { if (server != null) { - server.shutdown(); + server.shutdown(true); } super.serviceStop(); } @@ -507,6 +505,10 @@ public class TajoMasterClientService extends AbstractService { builder.setFinishTime(queryInfo.getFinishTime()); } else { builder.setFinishTime(System.currentTimeMillis()); + + if(!StringUtils.isEmpty(queryInfo.getLastMessage())) { + builder.setErrorMessage(queryInfo.getLastMessage()); + } } } else { Session session = context.getSessionManager().getSession(request.getSessionId().getId()); http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskFailedEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskFailedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskFailedEvent.java new file mode 100644 index 0000000..731134e --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskFailedEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.event; + +import org.apache.tajo.TaskId; +import org.apache.tajo.error.Errors.SerializedException; +import org.apache.tajo.master.TaskState; + +/** + * Event Class: From Task to Stage + */ +public class StageTaskFailedEvent extends StageTaskEvent { + private final SerializedException exception; + + public StageTaskFailedEvent(TaskId taskId, SerializedException exception) { + super(taskId, TaskState.FAILED); + this.exception = exception; + } + + public SerializedException getException() { + return exception; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java index d50fcb8..351a42b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java @@ -20,22 +20,23 @@ package org.apache.tajo.master.event; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.ResourceProtos.TaskFatalErrorReport; +import org.apache.tajo.error.Errors.SerializedException; +import org.apache.tajo.exception.ErrorUtil; public class TaskFatalErrorEvent extends TaskAttemptEvent { - private final String message; + private final SerializedException error; public TaskFatalErrorEvent(TaskFatalErrorReport report) { - super(new TaskAttemptId(report.getId()), - TaskAttemptEventType.TA_FATAL_ERROR); - this.message = report.getErrorMessage(); + super(new TaskAttemptId(report.getId()), TaskAttemptEventType.TA_FATAL_ERROR); + this.error = report.getError(); } - public TaskFatalErrorEvent(TaskAttemptId attemptId, String message) { + public TaskFatalErrorEvent(TaskAttemptId attemptId, Throwable e) { super(attemptId, TaskAttemptEventType.TA_FATAL_ERROR); - this.message = message; + this.error = ErrorUtil.convertException(e); } - public String errorMessage() { - return message; + public SerializedException getError() { + return error; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptFailedEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptFailedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptFailedEvent.java new file mode 100644 index 0000000..77e0c4a --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptFailedEvent.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.event; + +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.error.Errors.SerializedException; + +public class TaskTAttemptFailedEvent extends TaskTAttemptEvent { + private final SerializedException exception; + + public TaskTAttemptFailedEvent(TaskAttemptId attemptId, + SerializedException exception) { + super(attemptId, TaskEventType.T_ATTEMPT_FAILED); + this.exception = exception; + } + + public SerializedException getException() { + return exception; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index ef19ec9..8142e39 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -98,7 +98,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { @Override public void init(Configuration conf) { tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class); - rpcParams = RpcParameterFactory.get(new TajoConf()); + rpcParams = RpcParameterFactory.get(tajoConf); scheduledRequests = new ScheduledRequests(); minTaskMemory = tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY); @@ -117,11 +117,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { break; } else { LOG.fatal(e.getMessage(), e); - stage.abort(StageState.ERROR); + stage.abort(StageState.ERROR, e); } } catch (Throwable e) { LOG.fatal(e.getMessage(), e); - stage.abort(StageState.ERROR); + stage.abort(StageState.ERROR, e); break; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index f06d28c..d447476 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -44,7 +44,8 @@ import org.apache.tajo.engine.planner.global.ExecutionBlockCursor; import org.apache.tajo.engine.planner.global.ExecutionQueue; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.error.Errors.SerializedException; +import org.apache.tajo.exception.ErrorUtil; import org.apache.tajo.master.event.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; @@ -81,11 +82,12 @@ public class Query implements EventHandler { private long startTime; private long finishTime; private TableDesc resultDesc; - private int completedStagesCount = 0; - private int succeededStagesCount = 0; - private int killedStagesCount = 0; - private int failedStagesCount = 0; - private int erroredStagesCount = 0; + private volatile int completedStagesCount = 0; + private volatile int succeededStagesCount = 0; + private volatile int killedStagesCount = 0; + private volatile int failedStagesCount = 0; + private volatile int erroredStagesCount = 0; + private volatile SerializedException failureReason; private final List diagnostics = new ArrayList(); // Internal Variables @@ -343,6 +345,10 @@ public class Query implements EventHandler { } } + public SerializedException getFailureReason() { + return failureReason; + } + public List getDiagnostics() { readLock.lock(); try { @@ -529,6 +535,8 @@ public class Query implements EventHandler { query.clearPartitions(); } } catch (Throwable e) { + LOG.fatal(e.getMessage(), e); + query.failureReason = ErrorUtil.convertException(e); query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e))); return QueryState.QUERY_ERROR; } @@ -775,8 +783,10 @@ public class Query implements EventHandler { query.killedStagesCount++; } else if (castEvent.getState() == StageState.FAILED) { query.failedStagesCount++; + query.failureReason = query.getStage(castEvent.getExecutionBlockId()).getFailureReason(); } else if (castEvent.getState() == StageState.ERROR) { query.erroredStagesCount++; + query.failureReason = query.getStage(castEvent.getExecutionBlockId()).getFailureReason(); } else { LOG.error(String.format("Invalid Stage (%s) State %s at %s", castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getSynchronizedState().name())); http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index cd623c1..946cd03 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -37,7 +37,7 @@ import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.exception.ReturnStateUtil; +import org.apache.tajo.exception.ErrorUtil; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService; import org.apache.tajo.master.event.QueryStartEvent; @@ -351,9 +351,13 @@ public class QueryMaster extends CompositeService implements EventHandler { builder.setResultDesc(queryMasterTask.getQuery().getResultDesc().getProto()); } builder.setQueryProgress(queryMasterTask.getQuery().getProgress()); + + if(queryMasterTask.getQuery().getFailureReason() != null) { + builder.setError(queryMasterTask.getQuery().getFailureReason()); + } } if (queryMasterTask.isInitError()) { - builder.setStatusMessage(ReturnStateUtil.returnError(queryMasterTask.getInitError()).getMessage()); + builder.setError(ErrorUtil.convertException(queryMasterTask.getInitError())); } return builder.build(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index bcfb938..b010f1c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -37,6 +37,7 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.event.*; @@ -266,12 +267,12 @@ public class QueryMasterTask extends CompositeService { if(!callFuture.get().getValue()){ getEventHandler().handle( - new TaskFatalErrorEvent(taskAttemptId, "Can't kill task :" + taskAttemptId)); + new TaskFatalErrorEvent(taskAttemptId, new TajoInternalError("Can't kill task :" + taskAttemptId))); } } catch (Exception e) { /* Node RPC failure */ LOG.error(e.getMessage(), e); - getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e.getMessage())); + getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 8e8236d..51c5431 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -42,7 +42,10 @@ import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.error.Errors.SerializedException; +import org.apache.tajo.exception.ErrorUtil; import org.apache.tajo.exception.TajoException; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.TaskState; import org.apache.tajo.master.event.*; @@ -290,6 +293,7 @@ public class Stage implements EventHandler { private volatile int succeededObjectCount = 0; private volatile int killedObjectCount = 0; private volatile int failedObjectCount = 0; + private volatile SerializedException failureReason; private TaskSchedulerContext schedulerContext; private List hashShuffleIntermediateEntries = Lists.newArrayList(); private AtomicInteger completedShuffleTasks = new AtomicInteger(0); @@ -413,6 +417,10 @@ public class Stage implements EventHandler { return completedTaskCount; } + public SerializedException getFailureReason() { + return failureReason; + } + public ExecutionBlock getBlock() { return block; } @@ -534,18 +542,26 @@ public class Stage implements EventHandler { eventHandler.handle(new StageCompletedEvent(getId(), StageState.SUCCEEDED)); } + public void abort(StageState finalState) { + abort(finalState, null); + } + /** * It finalizes this stage. Unlike {@link Stage#complete()}, * it is invoked when a stage is abnormally finished. * * @param finalState The final stage state + * @param reason The failure reason, if exist */ - public void abort(StageState finalState) { + public void abort(StageState finalState, Throwable reason) { // TODO - // - committer.abortStage(...) // - record Stage Finish Time // - CleanUp Tasks // - Record History + if(reason != null) + failureReason = ErrorUtil.convertException(reason); + cleanup(); setFinishTime(); eventHandler.handle(new StageCompletedEvent(getId(), finalState)); @@ -1230,7 +1246,9 @@ public class Stage implements EventHandler { } else if (task.getState() == TaskState.KILLED) { stage.killedObjectCount++; } else if (task.getState() == TaskState.FAILED) { + StageTaskFailedEvent failedEvent = TUtil.checkTypeAndGet(event, StageTaskFailedEvent.class); stage.failedObjectCount++; + stage.failureReason = failedEvent.getException(); // if at least one task is failed, try to kill all tasks. stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL)); } @@ -1408,17 +1426,14 @@ public class Stage implements EventHandler { stage.getSucceededObjectCount(), stage.killedObjectCount)); - if (stage.killedObjectCount > 0 || stage.failedObjectCount > 0) { + // If the current stage are failed, next stages receives SQ_KILL event + if (stage.killedObjectCount + stage.failedObjectCount > 0) { if (stage.failedObjectCount > 0) { stage.abort(StageState.FAILED); return StageState.FAILED; - } else if (stage.killedObjectCount > 0) { + } else { stage.abort(StageState.KILLED); return StageState.KILLED; - } else { - LOG.error("Invalid State " + stage.getSynchronizedState() + " State"); - stage.abort(StageState.ERROR); - return StageState.ERROR; } } else { stage.complete(); @@ -1426,7 +1441,7 @@ public class Stage implements EventHandler { } } catch (Throwable t) { LOG.error(t.getMessage(), t); - stage.abort(StageState.ERROR); + stage.abort(StageState.ERROR, t); return StageState.ERROR; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java index 0eba088..b951a60 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java @@ -45,6 +45,7 @@ import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.util.Pair; +import org.apache.tajo.util.TUtil; import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.util.history.TaskHistory; @@ -624,10 +625,8 @@ public class Task implements EventHandler { private static class AttemptFailedTransition implements SingleArcTransition { @Override public void transition(Task task, TaskEvent event) { - if (!(event instanceof TaskTAttemptEvent)) { - throw new IllegalArgumentException("event should be a TaskTAttemptEvent type."); - } - TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; + TaskTAttemptFailedEvent attemptEvent = TUtil.checkTypeAndGet(event, TaskTAttemptFailedEvent.class); + LOG.info("============================================================="); LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<"); LOG.info("============================================================="); @@ -635,7 +634,7 @@ public class Task implements EventHandler { task.finishedAttempts++; task.finishTask(); - task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.FAILED)); + task.eventHandler.handle(new StageTaskFailedEvent(task.getId(), attemptEvent.getException())); } } @@ -644,10 +643,8 @@ public class Task implements EventHandler { @Override public TaskState transition(Task task, TaskEvent taskEvent) { - if (!(taskEvent instanceof TaskTAttemptEvent)) { - throw new IllegalArgumentException("taskEvent should be a TaskTAttemptEvent type."); - } - TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent; + TaskTAttemptFailedEvent attemptEvent = TUtil.checkTypeAndGet(taskEvent, TaskTAttemptFailedEvent.class); + task.failedAttempts++; task.finishedAttempts++; boolean retry = task.failedAttempts < task.maxAttempts; @@ -663,7 +660,7 @@ public class Task implements EventHandler { } } else { task.finishTask(); - task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.FAILED)); + task.eventHandler.handle(new StageTaskFailedEvent(task.getId(), attemptEvent.getException())); return TaskState.FAILED; } http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java index cda62a4..5eef883 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java @@ -410,7 +410,7 @@ public class TaskAttempt implements EventHandler { taskAttempt.fillTaskStatistics(report); taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED)); } catch (Throwable t) { - taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage())); + taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t)); taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t)); } } @@ -433,10 +433,10 @@ public class TaskAttempt implements EventHandler { throw new IllegalArgumentException("event should be a TaskFatalErrorEvent type."); } TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event; - taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED)); - taskAttempt.addDiagnosticInfo(errorEvent.errorMessage()); + taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent(taskAttempt.getId(), errorEvent.getError())); + taskAttempt.addDiagnosticInfo(errorEvent.getError().getMessage()); LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost() - + " >> " + errorEvent.errorMessage()); + + " >> " + errorEvent.getError().getMessage()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index c86c311..83a9aff 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -38,8 +38,9 @@ import org.apache.tajo.TajoProtos; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.ErrorUtil; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.plan.serder.PlanProto; @@ -308,13 +309,13 @@ public class ExecutionBlockContext { return taskHistories; } - public void fatalError(TaskAttemptId taskAttemptId, String message) { - if (message == null) { - message = "No error message"; + public void fatalError(TaskAttemptId taskAttemptId, Throwable error) { + if (error == null) { + error = new TajoInternalError("No error message"); } TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder() .setId(taskAttemptId.getProto()) - .setErrorMessage(message); + .setError(ErrorUtil.convertException(error)); try { //If QueryMaster does not responding, current execution block should be stop http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java index bd28bb7..ac37258 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java @@ -71,7 +71,7 @@ public class TaskContainer implements Runnable { if (task != null) { try { task.abort(); - task.getExecutionBlockContext().fatalError(task.getTaskContext().getTaskId(), e.getMessage()); + task.getExecutionBlockContext().fatalError(task.getTaskContext().getTaskId(), e); } catch (Throwable t) { LOG.fatal(t.getMessage(), t); } http://git-wip-us.apache.org/repos/asf/tajo/blob/031cf320/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java index fdd7da9..caf0262 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java @@ -31,6 +31,7 @@ import org.apache.tajo.TaskAttemptId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.TaskRequestImpl; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.resource.NodeResource; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.event.NodeResourceDeallocateEvent; @@ -157,7 +158,7 @@ public class TaskExecutor extends AbstractService implements EventHandler