tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject tajo git commit: TAJO-1356: Race conditions in QueryInProgress. (jinho)
Date Mon, 23 Feb 2015 11:17:14 GMT
Repository: tajo
Updated Branches:
  refs/heads/branch-0.10.0 5c5e51963 -> f89de1dca


TAJO-1356: Race conditions in QueryInProgress. (jinho)

Closes #386


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f89de1dc
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f89de1dc
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f89de1dc

Branch: refs/heads/branch-0.10.0
Commit: f89de1dca7cbdca651897d987ac9105a762f8e5d
Parents: 5c5e519
Author: jhkim <jhkim@apache.org>
Authored: Mon Feb 23 20:15:48 2015 +0900
Committer: jhkim <jhkim@apache.org>
Committed: Mon Feb 23 20:15:48 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../org/apache/tajo/master/QueryInProgress.java | 70 +++++++++++++-------
 .../org/apache/tajo/master/QueryManager.java    |  8 +--
 .../apache/tajo/querymaster/QueryJobEvent.java  |  1 -
 4 files changed, 48 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/f89de1dc/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0166397..ec03000 100644
--- a/CHANGES
+++ b/CHANGES
@@ -185,6 +185,8 @@ Release 0.10.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1356: Race conditions in QueryInProgress. (jinho)
+
     TAJO-1277: GreedyHeuristicJoinOrderAlgorithm sometimes wrongly assumes 
     associativity of joins. (Keuntae Park via jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f89de1dc/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index 45bdc5a..9e50797 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -31,7 +31,6 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
 import org.apache.tajo.master.rm.WorkerResourceManager;
 import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.querymaster.QueryJobEvent;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
@@ -41,6 +40,9 @@ import org.apache.tajo.util.NetUtils;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class QueryInProgress {
   private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
@@ -63,6 +65,9 @@ public class QueryInProgress {
 
   private QueryMasterProtocolService queryMasterRpcClient;
 
+  private final Lock readLock;
+  private final Lock writeLock;
+
   public QueryInProgress(
       TajoMaster.MasterContext masterContext,
       Session session,
@@ -76,16 +81,23 @@ public class QueryInProgress {
 
     queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
     queryInfo.setStartTime(System.currentTimeMillis());
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
   }
 
-  public synchronized void kill() {
-    getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
-    if (queryMasterRpcClient != null) {
-      try {
+  public void kill() {
+    writeLock.lock();
+    try {
+      getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
+      if (queryMasterRpcClient != null) {
         queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
-      } catch (Throwable e) {
-        catchException(e);
       }
+    } catch (Throwable e) {
+      catchException(e);
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -112,14 +124,14 @@ public class QueryInProgress {
 
   public boolean startQueryMaster() {
     try {
+      writeLock.lockInterruptibly();
       LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
       WorkerResourceManager resourceManager = masterContext.getResourceManager();
       WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
 
       // if no resource to allocate a query master
       if(resource == null) {
-        LOG.info("No Available Resources for QueryMaster");
-        return false;
+        throw new RuntimeException("No Available Resources for QueryMaster");
       }
 
       queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
@@ -131,6 +143,8 @@ public class QueryInProgress {
     } catch (Exception e) {
       catchException(e);
       return false;
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -142,12 +156,14 @@ public class QueryInProgress {
     queryMasterRpcClient = queryMasterRpc.getStub();
   }
 
-  public synchronized void submmitQueryToMaster() {
+  public void submmitQueryToMaster() {
     if(querySubmitted.get()) {
       return;
     }
 
     try {
+      writeLock.lockInterruptibly();
+
       if(queryMasterRpcClient == null) {
         connectQueryMaster();
       }
@@ -171,6 +187,8 @@ public class QueryInProgress {
       getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -185,7 +203,12 @@ public class QueryInProgress {
   }
 
   public QueryInfo getQueryInfo() {
-    return this.queryInfo;
+    readLock.lock();
+    try {
+      return this.queryInfo;
+    } finally {
+      readLock.unlock();
+    }
   }
 
   public boolean isStarted() {
@@ -195,17 +218,8 @@ public class QueryInProgress {
   public void heartbeat(QueryInfo queryInfo) {
     LOG.info("Received QueryMaster heartbeat:" + queryInfo);
 
-    // to avoid partial update by different heartbeats
-    synchronized (this.queryInfo) {
-
-      // terminal state will let client to retrieve a query result
-      // So, we must set the query result before changing query state
-      if (isFinishState(queryInfo.getQueryState())) {
-        if (queryInfo.hasResultdesc()) {
-          this.queryInfo.setResultDesc(queryInfo.getResultDesc());
-        }
-      }
-
+    writeLock.lock();
+    try {
       this.queryInfo.setQueryState(queryInfo.getQueryState());
       this.queryInfo.setProgress(queryInfo.getProgress());
 
@@ -220,12 +234,18 @@ public class QueryInProgress {
         LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
       }
 
-
+      // terminal state will let client to retrieve a query result
+      // So, we must set the query result before changing query state
       if (isFinishState(this.queryInfo.getQueryState())) {
+        if (queryInfo.hasResultdesc()) {
+          this.queryInfo.setResultDesc(queryInfo.getResultDesc());
+        }
+
         this.queryInfo.setFinishTime(System.currentTimeMillis());
-        masterContext.getQueryJobManager().getEventHandler().handle(
-            new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
+        masterContext.getQueryJobManager().stopQuery(queryInfo.getQueryId());
       }
+    } finally {
+      writeLock.unlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f89de1dc/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
index 8070a7c..a502e4b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -203,8 +203,7 @@ public class QueryManager extends CompositeService {
       dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
           queryInProgress.getQueryInfo()));
     } else {
-      dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP,
-          queryInProgress.getQueryInfo()));
+      masterContext.getQueryJobManager().stopQuery(queryInProgress.getQueryId());
     }
 
     return queryInProgress.getQueryInfo();
@@ -216,19 +215,14 @@ public class QueryManager extends CompositeService {
     public void handle(QueryJobEvent event) {
       QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
 
-
       if (queryInProgress == null) {
         LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId()
+ "]");
         return;
       }
 
-
       if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
         queryInProgress.submmitQueryToMaster();
 
-      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
-        stopQuery(event.getQueryInfo().getQueryId());
-
       } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
         scheduler.removeQuery(queryInProgress.getQueryId());
         queryInProgress.kill();

http://git-wip-us.apache.org/repos/asf/tajo/blob/f89de1dc/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
index 27eb2b6..91a82f6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
@@ -37,7 +37,6 @@ public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type>
{
   public enum Type {
     QUERY_MASTER_START,
     QUERY_JOB_HEARTBEAT,
-    QUERY_JOB_STOP,
     QUERY_JOB_KILL
   }
 }


Mime
View raw message