Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 725FF174BB for ; Fri, 8 May 2015 14:43:00 +0000 (UTC) Received: (qmail 6537 invoked by uid 500); 8 May 2015 14:43:00 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 6331 invoked by uid 500); 8 May 2015 14:43:00 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 6309 invoked by uid 99); 8 May 2015 14:43:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 May 2015 14:43:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2681CE0514; Fri, 8 May 2015 14:43:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jhkim@apache.org To: commits@tajo.apache.org Message-Id: <7f79ea75058f49ab895192f3cba8bbfc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tajo git commit: TAJO-1593: Add missing stop condition to Taskrunner. (jinho) Date: Fri, 8 May 2015 14:43:00 +0000 (UTC) Repository: tajo Updated Branches: refs/heads/master 1baf8dce6 -> ddd39213d TAJO-1593: Add missing stop condition to Taskrunner. (jinho) Closes #562 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ddd39213 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ddd39213 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ddd39213 Branch: refs/heads/master Commit: ddd39213d87fd76e9e80099e92010fd71b3d363e Parents: 1baf8dc Author: Jinho Kim Authored: Fri May 8 23:42:03 2015 +0900 Committer: Jinho Kim Committed: Fri May 8 23:42:03 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 ++ .../apache/tajo/worker/ExecutionBlockContext.java | 15 ++++++++++----- .../main/java/org/apache/tajo/worker/TaskRunner.java | 1 + .../java/org/apache/tajo/rpc/AsyncRpcServer.java | 6 +++++- .../java/org/apache/tajo/rpc/BlockingRpcServer.java | 6 +++++- 5 files changed, 23 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/ddd39213/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 9788307..0aad306 100644 --- a/CHANGES +++ b/CHANGES @@ -121,6 +121,8 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1593: Add missing stop condition to Taskrunner. (jinho) + TAJO-1556: "insert into select" with reordered column list does not work. (Contributed by Yongjin Choi, Committed by jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/ddd39213/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 270000a..0cc3304 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -35,9 +35,8 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.plan.serder.PlanProto; -import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcClientManager; +import org.apache.tajo.rpc.*; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.NetUtils; @@ -50,6 +49,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -307,7 +307,10 @@ public class ExecutionBlockContext { getWorkerContext().getHashShuffleAppenderManager().close(ebId); if (shuffles == null) { reporterBuilder.addAllIntermediateEntries(intermediateEntries); - stub.doneExecutionBlock(null, reporterBuilder.build(), NullCallback.get()); + + CallFuture callFuture = new CallFuture(); + stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture); + callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); return; } @@ -340,7 +343,9 @@ public class ExecutionBlockContext { } } try { - stub.doneExecutionBlock(null, reporterBuilder.build(), NullCallback.get()); + CallFuture callFuture = new CallFuture(); + stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture); + callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Throwable e) { // can't send report to query master LOG.fatal(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/tajo/blob/ddd39213/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index 31f25f0..774f358 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -241,6 +241,7 @@ public class TaskRunner extends AbstractService { // immediately. if (taskRequest.getShouldDie()) { LOG.info("Received ShouldDie flag:" + getId()); + break; } else { getContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc(); LOG.info("Accumulated Received Task: " + (++receivedNum)); http://git-wip-us.apache.org/repos/asf/tajo/blob/ddd39213/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java index 22f47b0..134b3cf 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java @@ -142,7 +142,11 @@ public class AsyncRpcServer extends NettyServerBase { ctx.close(); } Throwable rootCause = ExceptionUtils.getRootCause(cause); - LOG.fatal(ExceptionUtils.getMessage(rootCause), rootCause); + if(rootCause == null) { + LOG.fatal(ExceptionUtils.getMessage(cause), cause); + } else { + LOG.fatal(ExceptionUtils.getMessage(rootCause), rootCause); + } } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/ddd39213/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java index 93c28e3..007ada5 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java @@ -133,7 +133,11 @@ public class BlockingRpcServer extends NettyServerBase { ctx.close(); } Throwable rootCause = ExceptionUtils.getRootCause(cause); - LOG.fatal(ExceptionUtils.getMessage(rootCause), rootCause); + if(rootCause == null) { + LOG.fatal(ExceptionUtils.getMessage(cause), cause); + } else { + LOG.fatal(ExceptionUtils.getMessage(rootCause), rootCause); + } } } }