tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [10/13] tajo git commit: TAJO-1385: Remove locking on RMContext.
Date Sat, 11 Apr 2015 01:10:12 GMT
TAJO-1385: Remove locking on RMContext.

Signed-off-by: Jihoon Son <jihoonson@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/db912712
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/db912712
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/db912712

Branch: refs/heads/index_support
Commit: db912712d338fa61472a0b78f304d1ce8cff3ae0
Parents: 67a3117
Author: navis.ryu <navis@apache.org>
Authored: Fri Apr 10 22:38:15 2015 +0900
Committer: Jihoon Son <jihoonson@apache.org>
Committed: Fri Apr 10 22:38:15 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../java/org/apache/tajo/util/BasicFuture.java  | 104 +++++++
 .../tajo/master/rm/TajoResourceTracker.java     |  49 +---
 .../master/rm/TajoWorkerResourceManager.java    | 279 +++++++++++--------
 .../java/org/apache/tajo/master/rm/Worker.java  |  26 +-
 .../tajo/master/rm/WorkerResourceManager.java   |   8 +-
 .../main/java/org/apache/tajo/rpc/RpcUtils.java |   9 +-
 7 files changed, 305 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/db912712/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 706cb2f..9fe35f5 100644
--- a/CHANGES
+++ b/CHANGES
@@ -14,6 +14,9 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1385: Remove locking on RMContext. (Contributed by navis, 
+    Committed by jihoon)
+
     TAJO-1499: Check the bind status when EvalNode::eval() is called. (jihoon)
 
     TAJO-1400: Add TajoStatement::setMaxRows method support.

http://git-wip-us.apache.org/repos/asf/tajo/blob/db912712/tajo-common/src/main/java/org/apache/tajo/util/BasicFuture.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/BasicFuture.java b/tajo-common/src/main/java/org/apache/tajo/util/BasicFuture.java
new file mode 100644
index 0000000..e857817
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/BasicFuture.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class BasicFuture<T> implements Future<T> {
+
+  private T result;
+  private Exception exception;
+  private boolean finished;
+
+  @Override
+  public boolean cancel(boolean mayInterruptIfRunning) {
+    return false;
+  }
+
+  @Override
+  public boolean isCancelled() {
+    return false;
+  }
+
+  @Override
+  public synchronized boolean isDone() {
+    return finished;
+  }
+
+  @Override
+  public synchronized T get() throws InterruptedException, ExecutionException {
+    while (!finished) {
+      wait();
+    }
+    if (exception != null) {
+      throw new ExecutionException(exception);
+    }
+    return result;
+  }
+
+  @Override
+  public synchronized T get(long timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    long remain = unit.toMillis(timeout);
+    long prev = System.currentTimeMillis();
+    while (!finished && remain > 0) {
+      wait(remain);
+      long current = System.currentTimeMillis();
+      remain -= current - prev;
+      prev = current;
+    }
+    if (!finished) {
+      throw new TimeoutException("Timed-out");
+    }
+    if (exception != null) {
+      throw new ExecutionException(exception);
+    }
+    return result;
+  }
+
+  public synchronized boolean done(T result) {
+    try {
+      if (finished) {
+        return false;
+      }
+      this.result = result;
+      this.finished = true;
+      return true;
+    } finally {
+      notifyAll();
+    }
+  }
+
+  public synchronized boolean failed(Exception ex) {
+    try {
+      if (finished) {
+        return false;
+      }
+      this.exception = ex;
+      this.finished = true;
+      return true;
+    } finally {
+      notifyAll();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/db912712/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index 334dbf6..ba021fc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
@@ -57,6 +56,8 @@ import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTracke
 public class TajoResourceTracker extends AbstractService implements TajoResourceTrackerProtocolService.Interface
{
   /** Class logger */
   private Log LOG = LogFactory.getLog(TajoResourceTracker.class);
+
+  private final WorkerResourceManager manager;
   /** the context of TajoWorkerResourceManager */
   private final TajoRMContext rmContext;
   /** Liveliness monitor which checks ping expiry times of workers */
@@ -67,9 +68,10 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
   /** The bind address of RPC server of worker resource tracker */
   private InetSocketAddress bindAddress;
 
-  public TajoResourceTracker(TajoRMContext rmContext, WorkerLivelinessMonitor workerLivelinessMonitor)
{
+  public TajoResourceTracker(WorkerResourceManager manager, WorkerLivelinessMonitor workerLivelinessMonitor)
{
     super(TajoResourceTracker.class.getSimpleName());
-    this.rmContext = rmContext;
+    this.manager = manager;
+    this.rmContext = manager.getRMContext();
     this.workerLivelinessMonitor = workerLivelinessMonitor;
   }
 
@@ -175,7 +177,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
       }
 
     } finally {
-      builder.setClusterResourceSummary(getClusterResourceSummary());
+      builder.setClusterResourceSummary(manager.getClusterResourceSummary());
       done.run(builder.build());
     }
   }
@@ -199,43 +201,4 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
 
     return new Worker(rmContext, workerResource, new WorkerConnectionInfo(request.getConnectionInfo()));
   }
-
-  public ClusterResourceSummary getClusterResourceSummary() {
-    int totalDiskSlots = 0;
-    int totalCpuCoreSlots = 0;
-    int totalMemoryMB = 0;
-
-    int totalAvailableDiskSlots = 0;
-    int totalAvailableCpuCoreSlots = 0;
-    int totalAvailableMemoryMB = 0;
-
-    synchronized(rmContext) {
-      for(int eachWorker: rmContext.getWorkers().keySet()) {
-        Worker worker = rmContext.getWorkers().get(eachWorker);
-
-        if(worker != null) {
-          WorkerResource resource = worker.getResource();
-
-          totalMemoryMB += resource.getMemoryMB();
-          totalAvailableMemoryMB += resource.getAvailableMemoryMB();
-
-          totalDiskSlots += resource.getDiskSlots();
-          totalAvailableDiskSlots += resource.getAvailableDiskSlots();
-
-          totalCpuCoreSlots += resource.getCpuCoreSlots();
-          totalAvailableCpuCoreSlots += resource.getAvailableCpuCoreSlots();
-        }
-      }
-    }
-
-    return ClusterResourceSummary.newBuilder()
-        .setNumWorkers(rmContext.getWorkers().size())
-        .setTotalCpuCoreSlots(totalCpuCoreSlots)
-        .setTotalDiskSlots(totalDiskSlots)
-        .setTotalMemoryMB(totalMemoryMB)
-        .setTotalAvailableCpuCoreSlots(totalAvailableCpuCoreSlots)
-        .setTotalAvailableDiskSlots(totalAvailableDiskSlots)
-        .setTotalAvailableMemoryMB(totalAvailableMemoryMB)
-        .build();
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/db912712/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 091da2f..90a4eb5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -38,14 +38,13 @@ import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
 import org.apache.tajo.master.QueryInProgress;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.rpc.CancelableRpcCallback;
+import org.apache.tajo.rpc.RpcUtils;
 import org.apache.tajo.util.ApplicationIdUtils;
+import org.apache.tajo.util.BasicFuture;
 
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -72,7 +71,10 @@ public class TajoWorkerResourceManager extends CompositeService implements
Worke
    */
   private WorkerLivelinessMonitor workerLivelinessMonitor;
 
-  private BlockingQueue<WorkerResourceRequest> requestQueue;
+  private final BlockingQueue<WorkerResourceRequest> requestQueue =
+      new LinkedBlockingDeque<WorkerResourceRequest>();
+  private final RpcUtils.Scrutineer<BasicFuture<ClusterResourceSummary>> summaryRequest
=
+      new RpcUtils.Scrutineer<BasicFuture<ClusterResourceSummary>>();
 
   private AtomicBoolean stopped = new AtomicBoolean(false);
 
@@ -105,8 +107,6 @@ public class TajoWorkerResourceManager extends CompositeService implements
Worke
 
     this.queryIdSeed = String.valueOf(System.currentTimeMillis());
 
-    requestQueue = new LinkedBlockingDeque<WorkerResourceRequest>();
-
     workerResourceAllocator = new WorkerResourceAllocationThread();
     workerResourceAllocator.start();
 
@@ -116,7 +116,7 @@ public class TajoWorkerResourceManager extends CompositeService implements
Worke
     // Register event handler for Workers
     rmContext.getDispatcher().register(WorkerEventType.class, new WorkerEventDispatcher(rmContext));
 
-    resourceTracker = new TajoResourceTracker(rmContext, workerLivelinessMonitor);
+    resourceTracker = new TajoResourceTracker(this, workerLivelinessMonitor);
     addIfService(resourceTracker);
 
     super.serviceInit(systemConf);
@@ -160,11 +160,6 @@ public class TajoWorkerResourceManager extends CompositeService implements
Worke
   }
 
   @Override
-  public ClusterResourceSummary getClusterResourceSummary() {
-    return resourceTracker.getClusterResourceSummary();
-  }
-
-  @Override
   public void serviceStop() throws Exception {
     if(stopped.get()) {
       return;
@@ -289,13 +284,23 @@ public class TajoWorkerResourceManager extends CompositeService implements
Worke
     float allocatedDiskSlots;
   }
 
+  private static final long QUEUE_POLLING_TIME = 100;
+
   class WorkerResourceAllocationThread extends Thread {
     @Override
     public void run() {
       LOG.info("WorkerResourceAllocationThread start");
       while(!stopped.get()) {
+        BasicFuture<ClusterResourceSummary> future = summaryRequest.expire();
+        if (future != null) {
+          future.done(makeClusterResourceSummary());
+        }
         try {
-          WorkerResourceRequest resourceRequest = requestQueue.take();
+          WorkerResourceRequest resourceRequest = requestQueue.poll(
+              QUEUE_POLLING_TIME, TimeUnit.MILLISECONDS);
+          if (resourceRequest == null) {
+            continue;
+          }
 
           if (LOG.isDebugEnabled()) {
             LOG.debug("allocateWorkerResources:" +
@@ -351,13 +356,13 @@ public class TajoWorkerResourceManager extends CompositeService implements
Worke
               if(LOG.isDebugEnabled()) {
                 LOG.debug("=========================================");
                 LOG.debug("Available Workers");
-                for(int liveWorker: rmContext.getWorkers().keySet()) {
-                  LOG.debug(rmContext.getWorkers().get(liveWorker).toString());
+                for(Worker worker: rmContext.getWorkers().values()) {
+                  LOG.debug(worker.toString());
                 }
                 LOG.debug("=========================================");
               }
               requestQueue.put(resourceRequest);
-              Thread.sleep(100);
+              Thread.sleep(QUEUE_POLLING_TIME);
             }
           }
         } catch(InterruptedException ie) {
@@ -369,6 +374,54 @@ public class TajoWorkerResourceManager extends CompositeService implements
Worke
     }
   }
 
+  private static final long MAX_WAIT_TIME = 10000;
+
+  public ClusterResourceSummary getClusterResourceSummary() {
+    BasicFuture<ClusterResourceSummary> future =
+        summaryRequest.check(new BasicFuture<ClusterResourceSummary>());
+    try {
+      return future.get(MAX_WAIT_TIME, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      LOG.warn("Failed to get cluster summary by exception", e);
+    }
+    return null;
+  }
+
+  private ClusterResourceSummary makeClusterResourceSummary() {
+
+    int totalDiskSlots = 0;
+    int totalCpuCoreSlots = 0;
+    int totalMemoryMB = 0;
+
+    int totalAvailableDiskSlots = 0;
+    int totalAvailableCpuCoreSlots = 0;
+    int totalAvailableMemoryMB = 0;
+
+    for(Worker worker: rmContext.getWorkers().values()) {
+
+      WorkerResource resource = worker.getResource();
+
+      totalMemoryMB += resource.getMemoryMB();
+      totalAvailableMemoryMB += resource.getAvailableMemoryMB();
+
+      totalDiskSlots += resource.getDiskSlots();
+      totalAvailableDiskSlots += resource.getAvailableDiskSlots();
+
+      totalCpuCoreSlots += resource.getCpuCoreSlots();
+      totalAvailableCpuCoreSlots += resource.getAvailableCpuCoreSlots();
+    }
+
+    return ClusterResourceSummary.newBuilder()
+        .setNumWorkers(rmContext.getWorkers().size())
+        .setTotalCpuCoreSlots(totalCpuCoreSlots)
+        .setTotalDiskSlots(totalDiskSlots)
+        .setTotalMemoryMB(totalMemoryMB)
+        .setTotalAvailableCpuCoreSlots(totalAvailableCpuCoreSlots)
+        .setTotalAvailableDiskSlots(totalAvailableDiskSlots)
+        .setTotalAvailableMemoryMB(totalAvailableMemoryMB)
+        .build();
+  }
+
   private List<AllocatedWorkerResource> chooseWorkers(WorkerResourceRequest resourceRequest)
{
     List<AllocatedWorkerResource> selectedWorkers = new ArrayList<AllocatedWorkerResource>();
 
@@ -377,141 +430,133 @@ public class TajoWorkerResourceManager extends CompositeService implements
Worke
     ResourceRequestPriority resourceRequestPriority
       = resourceRequest.request.getResourceRequestPriority();
 
+    List<Worker> randomWorkers = new ArrayList<Worker>(rmContext.getWorkers().values());
+    Collections.shuffle(randomWorkers);
+
     if(resourceRequestPriority == ResourceRequestPriority.MEMORY) {
-      synchronized(rmContext) {
-        List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet());
-        Collections.shuffle(randomWorkers);
-
-        int numContainers = resourceRequest.request.getNumContainers();
-        int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer();
-        int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer();
-        float diskSlot = Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(),
+
+      int numContainers = resourceRequest.request.getNumContainers();
+      int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer();
+      int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer();
+      float diskSlot = Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(),
           resourceRequest.request.getMinDiskSlotPerContainer());
 
-        int liveWorkerSize = randomWorkers.size();
-        Set<Integer> insufficientWorkers = new HashSet<Integer>();
-        boolean stop = false;
-        boolean checkMax = true;
-        while(!stop) {
+      int liveWorkerSize = randomWorkers.size();
+      Set<Integer> insufficientWorkers = new HashSet<Integer>();
+      boolean stop = false;
+      boolean checkMax = true;
+      while(!stop) {
+        if(allocatedResources >= numContainers) {
+          break;
+        }
+
+        if(insufficientWorkers.size() >= liveWorkerSize) {
+          if(!checkMax) {
+            break;
+          }
+          insufficientWorkers.clear();
+          checkMax = false;
+        }
+        int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB;
+
+        for(Worker worker: randomWorkers) {
           if(allocatedResources >= numContainers) {
+            stop = true;
             break;
           }
 
           if(insufficientWorkers.size() >= liveWorkerSize) {
-            if(!checkMax) {
-              break;
-            }
-            insufficientWorkers.clear();
-            checkMax = false;
+            break;
           }
-          int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB;
 
-          for(int eachWorker: randomWorkers) {
-            if(allocatedResources >= numContainers) {
-              stop = true;
-              break;
+          WorkerResource workerResource = worker.getResource();
+          if(workerResource.getAvailableMemoryMB() >= compareAvailableMemory) {
+            int workerMemory;
+            if(workerResource.getAvailableMemoryMB() >= maxMemoryMB) {
+              workerMemory = maxMemoryMB;
+            } else {
+              workerMemory = workerResource.getAvailableMemoryMB();
             }
-
-            if(insufficientWorkers.size() >= liveWorkerSize) {
-              break;
+            AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+            allocatedWorkerResource.worker = worker;
+            allocatedWorkerResource.allocatedMemoryMB = workerMemory;
+            if(workerResource.getAvailableDiskSlots() >= diskSlot) {
+              allocatedWorkerResource.allocatedDiskSlots = diskSlot;
+            } else {
+              allocatedWorkerResource.allocatedDiskSlots = workerResource.getAvailableDiskSlots();
             }
 
-            Worker worker = rmContext.getWorkers().get(eachWorker);
-            WorkerResource workerResource = worker.getResource();
-            if(workerResource.getAvailableMemoryMB() >= compareAvailableMemory) {
-              int workerMemory;
-              if(workerResource.getAvailableMemoryMB() >= maxMemoryMB) {
-                workerMemory = maxMemoryMB;
-              } else {
-                workerMemory = workerResource.getAvailableMemoryMB();
-              }
-              AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
-              allocatedWorkerResource.worker = worker;
-              allocatedWorkerResource.allocatedMemoryMB = workerMemory;
-              if(workerResource.getAvailableDiskSlots() >= diskSlot) {
-                allocatedWorkerResource.allocatedDiskSlots = diskSlot;
-              } else {
-                allocatedWorkerResource.allocatedDiskSlots = workerResource.getAvailableDiskSlots();
-              }
-
-              workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
+            workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
                 allocatedWorkerResource.allocatedMemoryMB);
 
-              selectedWorkers.add(allocatedWorkerResource);
+            selectedWorkers.add(allocatedWorkerResource);
 
-              allocatedResources++;
-            } else {
-              insufficientWorkers.add(eachWorker);
-            }
+            allocatedResources++;
+          } else {
+            insufficientWorkers.add(worker.getWorkerId());
           }
         }
       }
     } else {
-      synchronized(rmContext) {
-        List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet());
-        Collections.shuffle(randomWorkers);
-
-        int numContainers = resourceRequest.request.getNumContainers();
-        float minDiskSlots = resourceRequest.request.getMinDiskSlotPerContainer();
-        float maxDiskSlots = resourceRequest.request.getMaxDiskSlotPerContainer();
-        int memoryMB = Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(),
+      int numContainers = resourceRequest.request.getNumContainers();
+      float minDiskSlots = resourceRequest.request.getMinDiskSlotPerContainer();
+      float maxDiskSlots = resourceRequest.request.getMaxDiskSlotPerContainer();
+      int memoryMB = Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(),
           resourceRequest.request.getMinMemoryMBPerContainer());
 
-        int liveWorkerSize = randomWorkers.size();
-        Set<Integer> insufficientWorkers = new HashSet<Integer>();
-        boolean stop = false;
-        boolean checkMax = true;
-        while(!stop) {
+      int liveWorkerSize = randomWorkers.size();
+      Set<Integer> insufficientWorkers = new HashSet<Integer>();
+      boolean stop = false;
+      boolean checkMax = true;
+      while(!stop) {
+        if(allocatedResources >= numContainers) {
+          break;
+        }
+
+        if(insufficientWorkers.size() >= liveWorkerSize) {
+          if(!checkMax) {
+            break;
+          }
+          insufficientWorkers.clear();
+          checkMax = false;
+        }
+        float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots;
+
+        for(Worker worker: randomWorkers) {
           if(allocatedResources >= numContainers) {
+            stop = true;
             break;
           }
 
           if(insufficientWorkers.size() >= liveWorkerSize) {
-            if(!checkMax) {
-              break;
-            }
-            insufficientWorkers.clear();
-            checkMax = false;
+            break;
           }
-          float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots;
 
-          for(int eachWorker: randomWorkers) {
-            if(allocatedResources >= numContainers) {
-              stop = true;
-              break;
+          WorkerResource workerResource = worker.getResource();
+          if(workerResource.getAvailableDiskSlots() >= compareAvailableDisk) {
+            float workerDiskSlots;
+            if(workerResource.getAvailableDiskSlots() >= maxDiskSlots) {
+              workerDiskSlots = maxDiskSlots;
+            } else {
+              workerDiskSlots = workerResource.getAvailableDiskSlots();
             }
+            AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+            allocatedWorkerResource.worker = worker;
+            allocatedWorkerResource.allocatedDiskSlots = workerDiskSlots;
 
-            if(insufficientWorkers.size() >= liveWorkerSize) {
-              break;
+            if(workerResource.getAvailableMemoryMB() >= memoryMB) {
+              allocatedWorkerResource.allocatedMemoryMB = memoryMB;
+            } else {
+              allocatedWorkerResource.allocatedMemoryMB = workerResource.getAvailableMemoryMB();
             }
-
-            Worker worker = rmContext.getWorkers().get(eachWorker);
-            WorkerResource workerResource = worker.getResource();
-            if(workerResource.getAvailableDiskSlots() >= compareAvailableDisk) {
-              float workerDiskSlots;
-              if(workerResource.getAvailableDiskSlots() >= maxDiskSlots) {
-                workerDiskSlots = maxDiskSlots;
-              } else {
-                workerDiskSlots = workerResource.getAvailableDiskSlots();
-              }
-              AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
-              allocatedWorkerResource.worker = worker;
-              allocatedWorkerResource.allocatedDiskSlots = workerDiskSlots;
-
-              if(workerResource.getAvailableMemoryMB() >= memoryMB) {
-                allocatedWorkerResource.allocatedMemoryMB = memoryMB;
-              } else {
-                allocatedWorkerResource.allocatedMemoryMB = workerResource.getAvailableMemoryMB();
-              }
-              workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
+            workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
                 allocatedWorkerResource.allocatedMemoryMB);
 
-              selectedWorkers.add(allocatedWorkerResource);
+            selectedWorkers.add(allocatedWorkerResource);
 
-              allocatedResources++;
-            } else {
-              insufficientWorkers.add(eachWorker);
-            }
+            allocatedResources++;
+          } else {
+            insufficientWorkers.add(worker.getWorkerId());
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/db912712/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
index a2ab598..6535688 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
@@ -44,7 +44,7 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker>
{
   private long lastHeartbeatTime;
 
   /** Resource capability */
-  private WorkerResource resource;
+  private final WorkerResource resource;
 
   /** Worker connection information */
   private WorkerConnectionInfo connectionInfo;
@@ -210,20 +210,26 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker>
{
         throw new IllegalArgumentException("event should be a WorkerStatusEvent type.");
       }
       WorkerStatusEvent statusEvent = (WorkerStatusEvent) event;
-
-      // TODO - the synchronization scope using rmContext is too coarsen.
-      synchronized (worker.rmContext) {
-        worker.setLastHeartbeatTime(System.currentTimeMillis());
-        worker.getResource().setNumRunningTasks(statusEvent.getRunningTaskNum());
-        worker.getResource().setMaxHeap(statusEvent.maxHeap());
-        worker.getResource().setFreeHeap(statusEvent.getFreeHeap());
-        worker.getResource().setTotalHeap(statusEvent.getTotalHeap());
-      }
+      worker.updateStatus(statusEvent);
 
       return WorkerState.RUNNING;
     }
   }
 
+  private void updateStatus(WorkerStatusEvent statusEvent) {
+    this.writeLock.lock();
+
+    try {
+      lastHeartbeatTime = System.currentTimeMillis();
+      resource.setNumRunningTasks(statusEvent.getRunningTaskNum());
+      resource.setMaxHeap(statusEvent.maxHeap());
+      resource.setFreeHeap(statusEvent.getFreeHeap());
+      resource.setTotalHeap(statusEvent.getTotalHeap());
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
   public static class DeactivateNodeTransition implements SingleArcTransition<Worker,
WorkerEvent> {
     private final WorkerState finalState;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/db912712/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 662b699..3d5e062 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -105,5 +105,11 @@ public interface WorkerResourceManager extends Service {
    *
    * @return WorkerIds on which QueryMasters are running
    */
-  Collection<Integer> getQueryMasters();
+  public Collection<Integer> getQueryMasters();
+
+  /**
+   *
+   * @return RMContext
+   */
+  public TajoRMContext getRMContext();
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/db912712/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java
index 152d426..99905b9 100644
--- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java
@@ -101,11 +101,16 @@ public class RpcUtils {
     }
   }
 
+  // non-blocking lock which passes only a ticket before cleared or removed
   public static class Scrutineer<T> {
 
     private final AtomicReference<T> reference = new AtomicReference<T>();
 
-    T check(T ticket) {
+    public T expire() {
+      return reference.getAndSet(null);
+    }
+
+    public T check(T ticket) {
       T granted = reference.get();
       for (;granted == null; granted = reference.get()) {
         if (reference.compareAndSet(null, ticket)) {
@@ -115,7 +120,7 @@ public class RpcUtils {
       return granted;
     }
 
-    boolean clear(T granted) {
+    public boolean clear(T granted) {
       return reference.compareAndSet(granted, null);
     }
   }


Mime
View raw message