tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [07/13] tajo git commit: TAJO-1469 allocateQueryMaster can leak resources if it times-out (3sec, hardcoded)
Date Sat, 11 Apr 2015 01:10:09 GMT
TAJO-1469 allocateQueryMaster can leak resources if it times-out (3sec, hardcoded)

Closes #480


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/69895422
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/69895422
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/69895422

Branch: refs/heads/index_support
Commit: 69895422cf2f770d897a8cbe554ea739cd091b20
Parents: a1cf248
Author: navis.ryu <navis@apache.org>
Authored: Wed Apr 8 11:25:25 2015 +0900
Committer: Hyoungjun Kim <babokim@gmail.com>
Committed: Wed Apr 8 11:25:25 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../java/org/apache/tajo/conf/TajoConf.java     | 68 ++++++++++++++++++
 .../master/rm/TajoWorkerResourceManager.java    | 32 +++++++--
 .../apache/tajo/rpc/CancelableRpcCallback.java  | 74 ++++++++++++++++++++
 4 files changed, 170 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/69895422/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 523fb45..fba55e9 100644
--- a/CHANGES
+++ b/CHANGES
@@ -74,6 +74,9 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1469: allocateQueryMaster can leak resources if it times-out (3sec, 
+    hardcoded) (Contributed by navis, Committed by hyoungjun)
+
     TAJO-1538: TajoWorkerResourceManager.allocatedResourceMap is increasing 
     forever. (Contributed by navis. Committed by jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/69895422/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index e892dc9..e74d842 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -40,6 +40,7 @@ import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
 
 public class TajoConf extends Configuration {
   private static TimeZone SYSTEM_TIMEZONE;
@@ -154,6 +155,7 @@ public class TajoConf extends Configuration {
     // QueryMaster resource
     TAJO_QUERYMASTER_DISK_SLOT("tajo.qm.resource.disk.slots", 0.0f, Validators.min("0.0f")),
     TAJO_QUERYMASTER_MEMORY_MB("tajo.qm.resource.memory-mb", 512, Validators.min("64")),
+    TAJO_QUERYMASTER_ALLOCATION_TIMEOUT("tajo.qm.resource.allocation.timeout", "3 sec"),
 
     // Tajo Worker Service Addresses
     WORKER_INFO_ADDRESS("tajo.worker.info-http.address", "0.0.0.0:28080", Validators.networkAddr()),
@@ -580,6 +582,72 @@ public class TajoConf extends Configuration {
     setBoolVar(this, var, val);
   }
 
+  // borrowed from HIVE-5799
+  public static long getTimeVar(Configuration conf, ConfVars var, TimeUnit outUnit) {
+    return toTime(getVar(conf, var), outUnit);
+  }
+
+  public static void setTimeVar(Configuration conf, ConfVars var, long time, TimeUnit timeunit)
{
+    assert (var.valClass == String.class) : var.varname;
+    conf.set(var.varname, time + stringFor(timeunit));
+  }
+
+  public long getTimeVar(ConfVars var, TimeUnit outUnit) {
+    return getTimeVar(this, var, outUnit);
+  }
+
+  public void setTimeVar(ConfVars var, long time, TimeUnit outUnit) {
+    setTimeVar(this, var, time, outUnit);
+  }
+
+  public static long toTime(String value, TimeUnit outUnit) {
+    String[] parsed = parseTime(value.trim());
+    return outUnit.convert(Long.valueOf(parsed[0].trim()), unitFor(parsed[1].trim()));
+  }
+
+  private static String[] parseTime(String value) {
+    char[] chars = value.toCharArray();
+    int i = 0;
+    for (; i < chars.length && (chars[i] == '-' || Character.isDigit(chars[i]));
i++) {
+    }
+    return new String[] {value.substring(0, i), value.substring(i)};
+  }
+
+  public static TimeUnit unitFor(String unit) {
+    unit = unit.trim().toLowerCase();
+    if (unit.isEmpty() || unit.equals("l")) {
+      return TimeUnit.MILLISECONDS;
+    } else if (unit.equals("d") || unit.startsWith("day")) {
+      return TimeUnit.DAYS;
+    } else if (unit.equals("h") || unit.startsWith("hour")) {
+      return TimeUnit.HOURS;
+    } else if (unit.equals("m") || unit.startsWith("min")) {
+      return TimeUnit.MINUTES;
+    } else if (unit.equals("s") || unit.startsWith("sec")) {
+      return TimeUnit.SECONDS;
+    } else if (unit.equals("ms") || unit.startsWith("msec")) {
+      return TimeUnit.MILLISECONDS;
+    } else if (unit.equals("us") || unit.startsWith("usec")) {
+      return TimeUnit.MICROSECONDS;
+    } else if (unit.equals("ns") || unit.startsWith("nsec")) {
+      return TimeUnit.NANOSECONDS;
+    }
+    throw new IllegalArgumentException("Invalid time unit " + unit);
+  }
+
+  public static String stringFor(TimeUnit timeunit) {
+    switch (timeunit) {
+      case DAYS: return "day";
+      case HOURS: return "hour";
+      case MINUTES: return "min";
+      case SECONDS: return "sec";
+      case MILLISECONDS: return "msec";
+      case MICROSECONDS: return "usec";
+      case NANOSECONDS: return "nsec";
+    }
+    throw new IllegalArgumentException("Invalid timeunit " + timeunit);
+  }
+
   public void setClassVar(ConfVars var, Class<?> clazz) {
     setVar(var, clazz.getCanonicalName());
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/69895422/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 541b1b6..091da2f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -37,7 +37,7 @@ import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
 import org.apache.tajo.master.QueryInProgress;
 import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.CancelableRpcCallback;
 import org.apache.tajo.util.ApplicationIdUtils;
 
 import java.io.IOException;
@@ -209,23 +209,41 @@ public class TajoWorkerResourceManager extends CompositeService implements
Worke
 
   @Override
   public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress) {
+
+    // 3 seconds, by default
+    long timeout = masterContext.getConf().getTimeVar(
+        TajoConf.ConfVars.TAJO_QUERYMASTER_ALLOCATION_TIMEOUT, TimeUnit.MILLISECONDS);
+
     // Create a resource request for a query master
     WorkerResourceAllocationRequest qmResourceRequest = createQMResourceRequest(queryInProgress.getQueryId());
 
     // call future for async call
-    CallFuture<WorkerResourceAllocationResponse> callFuture = new CallFuture<WorkerResourceAllocationResponse>();
+    final CancelableRpcCallback<WorkerResourceAllocationResponse> callFuture =
+        new CancelableRpcCallback<WorkerResourceAllocationResponse>() {
+          @Override
+          protected void cancel(WorkerResourceAllocationResponse canceled) {
+            if (canceled != null && !canceled.getWorkerAllocatedResourceList().isEmpty())
{
+              LOG.info("Canceling resources allocated");
+              WorkerAllocatedResource resource = canceled.getWorkerAllocatedResource(0);
+              releaseWorkerResource(resource.getContainerId());
+            }
+          }
+        };
     allocateWorkerResources(qmResourceRequest, callFuture);
 
-    // Wait for 3 seconds
     WorkerResourceAllocationResponse response = null;
     try {
-      response = callFuture.get(3, TimeUnit.SECONDS);
+      response = callFuture.get(timeout, TimeUnit.MILLISECONDS);
     } catch (Throwable t) {
-      LOG.error(t, t);
-      return null;
+      response = callFuture.cancel(); // try cancel
+      if (response == null) {
+        // canceled successfuly
+        LOG.warn("Got exception waiting resources for query master " + queryInProgress.getQueryId(),
t);
+        return null;
+      }
     }
 
-    if (response.getWorkerAllocatedResourceList().size() == 0) {
+    if (response == null || response.getWorkerAllocatedResourceList().size() == 0) {
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/69895422/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CancelableRpcCallback.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CancelableRpcCallback.java
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CancelableRpcCallback.java
new file mode 100644
index 0000000..80bf76c
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CancelableRpcCallback.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.RpcCallback;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+// message exchange between threads which can be only a success(run) or fail(cancel)
+// successful cancel will make all following run() invocations to cancel the object, by calling
cancel(T)
+public class CancelableRpcCallback<T> implements RpcCallback<T> {
+
+  private static final int INITIAL = 0;
+  private static final int RESULT = 1;
+  private static final int CANCELED = 2;
+
+  private volatile T result;
+  private final AtomicInteger state = new AtomicInteger(INITIAL);
+  private final Semaphore semaphore = new Semaphore(0);
+
+  @Override
+  public void run(T result) {
+    assert result != null;
+    try {
+      if (state.compareAndSet(INITIAL, RESULT)) {
+        this.result = result;
+      } else {
+        cancel(result);
+      }
+    } finally {
+      semaphore.release();
+    }
+  }
+
+  public T cancel() {
+    try {
+      if (state.compareAndSet(INITIAL, CANCELED)) {
+        return null;
+      }
+      return state.get() == RESULT ? result : null;
+    } finally {
+      semaphore.release();
+    }
+  }
+
+  public T get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
{
+    if (semaphore.tryAcquire(timeout, unit) && state.get() == RESULT) {
+      return result;
+    }
+    throw new TimeoutException();
+  }
+
+  protected void cancel(T canceled) {
+  }
+}


Mime
View raw message