tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject tajo git commit: TAJO-1935: Some Tasks don't work after they become TA_ASSIGNED.
Date Wed, 18 Nov 2015 04:57:35 GMT
Repository: tajo
Updated Branches:
  refs/heads/branch-0.11.1 7734e06e5 -> a0fb954da


TAJO-1935: Some Tasks don't work after they become TA_ASSIGNED.


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/a0fb954d
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/a0fb954d
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/a0fb954d

Branch: refs/heads/branch-0.11.1
Commit: a0fb954da92f3945b2b420c322b6344612d3924b
Parents: 7734e06
Author: Jinho Kim <jhkim@apache.org>
Authored: Wed Nov 18 13:57:04 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Wed Nov 18 13:57:04 2015 +0900

----------------------------------------------------------------------
 CHANGES                                               |  2 ++
 .../java/org/apache/tajo/master/QueryInProgress.java  |  6 ++----
 .../apache/tajo/querymaster/DefaultTaskScheduler.java | 14 +++++++-------
 .../java/org/apache/tajo/querymaster/QueryMaster.java | 11 ++++++-----
 .../org/apache/tajo/worker/ExecutionBlockContext.java | 12 +++++++-----
 .../org/apache/tajo/worker/NodeStatusUpdater.java     |  6 +-----
 .../main/java/org/apache/tajo/worker/TaskManager.java |  5 +----
 .../main/java/org/apache/tajo/rpc/RpcConstants.java   |  1 -
 8 files changed, 26 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/a0fb954d/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index d561844..b89e3d8 100644
--- a/CHANGES
+++ b/CHANGES
@@ -14,6 +14,8 @@ Release 0.11.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1935: Some Tasks don't work after they become TA_ASSIGNED. (jinho)
+
     TAJO-1977: Cannot recognize the space-contained tablename and databasename.
     (Contributed by Dongkyu Hwangbo, committed by jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/a0fb954d/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index 8e999c3..ba70bad 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -35,7 +35,6 @@ import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcClientManager;
-import org.apache.tajo.rpc.RpcConstants;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.session.Session;
 import org.apache.tajo.util.NetUtils;
@@ -44,7 +43,6 @@ import org.apache.tajo.util.RpcParameterFactory;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -105,7 +103,7 @@ public class QueryInProgress {
       if (queryMasterRpcClient != null) {
         CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
         queryMasterRpcClient.killQuery(callFuture.getController(), queryId.getProto(), callFuture);
-        callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
+        callFuture.get();
       }
     } catch (Throwable e) {
       catchException("Failed to kill query " + queryId + " by exception " + e, e);
@@ -222,7 +220,7 @@ public class QueryInProgress {
 
       CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
       queryMasterRpcClient.executeQuery(callFuture.getController(), builder.build(), callFuture);
-      callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
+      callFuture.get();
 
       querySubmitted = true;
       getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);

http://git-wip-us.apache.org/repos/asf/tajo/blob/a0fb954d/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
index 56f8829..eb14599 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
@@ -42,7 +42,10 @@ import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptSched
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.plan.serder.LogicalNodeSerializer;
 import org.apache.tajo.resource.NodeResources;
-import org.apache.tajo.rpc.*;
+import org.apache.tajo.rpc.AsyncRpcClient;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -55,7 +58,6 @@ import org.apache.tajo.worker.FetchImpl;
 import java.net.InetSocketAddress;
 import java.util.*;
 import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -314,7 +316,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         .setQueue(context.getMasterContext().getQueryContext().get("queue", "default"));
//TODO set queue
 
     masterClientService.reserveNodeResources(callBack.getController(), request.build(), callBack);
-    NodeResourceResponse response = callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT,
TimeUnit.SECONDS);
+    NodeResourceResponse response = callBack.get();
 
     for (AllocationResourceProto resource : response.getResourceList()) {
       taskRequestEvents.add(new TaskRequestEvent(resource.getWorkerId(), resource, context.getBlockId()));
@@ -896,8 +898,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
             TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
             tajoWorkerRpcClient.allocateTasks(callFuture.getController(), requestProto.build(),
callFuture);
 
-            BatchAllocationResponse responseProto =
-                callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
+            BatchAllocationResponse responseProto = callFuture.get();
 
             if (responseProto.getCancellationTaskCount() > 0) {
               for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) {
@@ -1015,8 +1016,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
             TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
             tajoWorkerRpcClient.allocateTasks(callFuture.getController(), requestProto.build(),
callFuture);
 
-            BatchAllocationResponse
-                responseProto = callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT,
TimeUnit.SECONDS);
+            BatchAllocationResponse responseProto = callFuture.get();
 
             if(responseProto.getCancellationTaskCount() > 0) {
               for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/a0fb954d/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 1b90080..cd623c1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -42,7 +42,10 @@ import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService;
 import org.apache.tajo.master.event.QueryStartEvent;
 import org.apache.tajo.master.event.QueryStopEvent;
-import org.apache.tajo.rpc.*;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.RpcParameterFactory;
@@ -56,7 +59,6 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 public class QueryMaster extends CompositeService implements EventHandler {
   private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
@@ -180,8 +182,7 @@ public class QueryMaster extends CompositeService implements EventHandler
{
       masterService.getAllWorkers(callBack.getController(),
           PrimitiveProtos.NullProto.getDefaultInstance(), callBack);
 
-      WorkerConnectionsResponse connectionsProto =
-          callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
+      WorkerConnectionsResponse connectionsProto = callBack.get();
       return connectionsProto.getWorkerList();
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
@@ -300,7 +301,7 @@ public class QueryMaster extends CompositeService implements EventHandler
{
 
         QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
-        future.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
+        future.get();
       }  catch (Exception e) {
         //this function will be closed in new thread.
         //When tajo do stop cluster, tajo master maybe throw closed connection exception

http://git-wip-us.apache.org/repos/asf/tajo/blob/a0fb954d/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index a3cc8fc..474ca2f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -36,7 +36,10 @@ import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.rpc.*;
+import org.apache.tajo.rpc.AsyncRpcClient;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.util.Pair;
@@ -47,7 +50,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -253,7 +255,7 @@ public class ExecutionBlockContext {
       //If QueryMaster does not responding, current execution block should be stop
       CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
       getStub().fatalError(callFuture.getController(), builder.build(), callFuture);
-      callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
+      callFuture.get();
     } catch (Exception e) {
       getWorkerContext().getTaskManager().getDispatcher().getEventHandler()
           .handle(new ExecutionBlockErrorEvent(taskAttemptId.getTaskId().getExecutionBlockId(),
e));
@@ -300,7 +302,7 @@ public class ExecutionBlockContext {
 
         CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
         stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture);
-        callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
+        callFuture.get();
         return;
       }
 
@@ -355,7 +357,7 @@ public class ExecutionBlockContext {
     try {
       CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
       stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture);
-      callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
+      callFuture.get();
     } catch (Throwable e) {
       // can't send report to query master
       LOG.fatal(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/a0fb954d/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
index a3b71e1..8fe602f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
@@ -32,7 +32,6 @@ import org.apache.tajo.resource.NodeResources;
 import org.apache.tajo.rpc.AsyncRpcClient;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.RpcClientManager;
-import org.apache.tajo.rpc.RpcConstants;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.util.RpcParameterFactory;
@@ -46,7 +45,6 @@ import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import static org.apache.tajo.ResourceProtos.*;
 
@@ -168,11 +166,9 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
       CallFuture<NodeHeartbeatResponse> callBack = new CallFuture<NodeHeartbeatResponse>();
 
       resourceTracker.nodeHeartbeat(callBack.getController(), requestProto, callBack);
-      response = callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
+      response = callBack.get();
     } catch (InterruptedException e) {
       LOG.warn(e.getMessage());
-    } catch (TimeoutException te) {
-      LOG.warn("Heartbeat response is being delayed.", te);
     } catch (ExecutionException ee) {
       LOG.warn("TajoMaster failure: " + ee.getMessage());
       resourceTracker = null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/a0fb954d/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
index 468a44a..0f7489a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java
@@ -33,7 +33,6 @@ import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.rpc.AsyncRpcClient;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.RpcClientManager;
-import org.apache.tajo.rpc.RpcConstants;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.RpcParameterFactory;
 import org.apache.tajo.util.TUtil;
@@ -44,7 +43,6 @@ import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.tajo.ResourceProtos.*;
 
@@ -125,8 +123,7 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan
       CallFuture<ExecutionBlockContextResponse> callback = new CallFuture<ExecutionBlockContextResponse>();
       stub.getExecutionBlockContext(callback.getController(), request.build(), callback);
 
-      ExecutionBlockContextResponse contextProto =
-          callback.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
+      ExecutionBlockContextResponse contextProto = callback.get();
       ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), contextProto,
client);
 
       context.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/a0fb954d/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java
b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java
index 95e5ae4..fec0e6b 100644
--- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java
@@ -27,7 +27,6 @@ public class RpcConstants {
 
   public static final String PING_PACKET = "TAJO";
   public static final int DEFAULT_PAUSE = 1000; // 1 sec
-  public static final int FUTURE_TIMEOUT_SECONDS_DEFAULT = 10;
 
   /** How many times the connect will retry */
   public static final String CLIENT_RETRY_NUM = "tajo.rpc.client.retry-num";


Mime
View raw message