hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prasan...@apache.org
Subject [1/2] hive git commit: HIVE-17508: Implement global execution triggers based on counters (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Date Tue, 17 Oct 2017 08:55:06 GMT
Repository: hive
Updated Branches:
  refs/heads/master 8c3f0e403 -> 7d6a51193


http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java
new file mode 100644
index 0000000..f16125d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.wm;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.Validator;
+
+/**
+ * Factory to create expressions
+ */
+public class ExpressionFactory {
+
+  public static Expression fromString(final String expression) {
+    if (expression == null || expression.isEmpty()) {
+      return null;
+    }
+
+    // TODO: Only ">" predicate is supported right now, this has to be extended to support expression tree when
+    // multiple conditions are required. HIVE-17622
+
+    String[] tokens = expression.split(Expression.Predicate.GREATER_THAN.getSymbol());
+    if (tokens.length != 2) {
+      throw new IllegalArgumentException("Invalid predicate in expression");
+    }
+
+    final String counterName = tokens[0].trim();
+    final String counterValueStr = tokens[1].trim();
+    if (counterName.isEmpty()) {
+      throw new IllegalArgumentException("Counter name cannot be empty!");
+    }
+
+    // look for matches in file system counters
+    long counterValue;
+    for (FileSystemCounterLimit.FSCounter fsCounter : FileSystemCounterLimit.FSCounter.values()) {
+      if (counterName.toUpperCase().endsWith(fsCounter.name())) {
+        try {
+          counterValue = getCounterValue(counterValueStr, new Validator.SizeValidator());
+          if (counterValue < 0) {
+            throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value.");
+          }
+        } catch (NumberFormatException e) {
+          throw new IllegalArgumentException("Invalid counter value: " + counterValueStr);
+        }
+        // this is file system counter, valid and create counter
+        FileSystemCounterLimit fsCounterLimit = FileSystemCounterLimit.fromName(counterName, counterValue);
+        return createExpression(fsCounterLimit);
+      }
+    }
+
+    // look for matches in time based counters
+    for (TimeCounterLimit.TimeCounter timeCounter : TimeCounterLimit.TimeCounter.values()) {
+      if (counterName.equalsIgnoreCase(timeCounter.name())) {
+        try {
+          counterValue = getCounterValue(counterValueStr, new Validator.TimeValidator(TimeUnit.MILLISECONDS));
+          if (counterValue < 0) {
+            throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value.");
+          }
+        } catch (NumberFormatException e) {
+          throw new IllegalArgumentException("Invalid counter value: " + counterValueStr);
+        }
+        TimeCounterLimit timeCounterLimit = new TimeCounterLimit(
+          TimeCounterLimit.TimeCounter.valueOf(counterName.toUpperCase()), counterValue);
+        return createExpression(timeCounterLimit);
+      }
+    }
+
+    // look for matches in vertex specific counters
+    for (VertexCounterLimit.VertexCounter vertexCounter : VertexCounterLimit.VertexCounter.values()) {
+      if (counterName.equalsIgnoreCase(vertexCounter.name())) {
+        try {
+          counterValue = getCounterValue(counterValueStr, null);
+          if (counterValue < 0) {
+            throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value.");
+          }
+        } catch (NumberFormatException e) {
+          throw new IllegalArgumentException("Invalid counter value: " + counterValueStr);
+        }
+        VertexCounterLimit vertexCounterLimit = new VertexCounterLimit(
+          VertexCounterLimit.VertexCounter.valueOf(counterName.toUpperCase()), counterValue);
+        return createExpression(vertexCounterLimit);
+      }
+    }
+    // unable to create expression at this point, invalid expression
+    throw new IllegalArgumentException("Invalid expression! " + expression);
+  }
+
+  private static long getCounterValue(final String counterValueStr, final Validator validator) throws
+    NumberFormatException {
+    long counter;
+    try {
+      counter = Long.parseLong(counterValueStr);
+    } catch (NumberFormatException e) {
+      if (validator != null) {
+        if (validator instanceof Validator.SizeValidator) {
+          return HiveConf.toSizeBytes(counterValueStr);
+        } else if (validator instanceof Validator.TimeValidator) {
+          return HiveConf.toTime(counterValueStr, TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS);
+        }
+      }
+      throw e;
+    }
+    return counter;
+  }
+
+  static Expression createExpression(CounterLimit counterLimit) {
+    return new TriggerExpression(counterLimit, Expression.Predicate.GREATER_THAN);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java
new file mode 100644
index 0000000..656747e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.wm;
+
+/**
+ * File system specific counters with defined limits
+ */
+public class FileSystemCounterLimit implements CounterLimit {
+
+  public enum FSCounter {
+    BYTES_READ,
+    BYTES_WRITTEN,
+    SHUFFLE_BYTES
+  }
+
+  private String scheme;
+  private FSCounter fsCounter;
+  private long limit;
+
+  public FileSystemCounterLimit(final String scheme, final FSCounter fsCounter, final long limit) {
+    this.scheme = scheme == null || scheme.isEmpty() ? "" : scheme.toUpperCase();
+    this.fsCounter = fsCounter;
+    this.limit = limit;
+  }
+
+  public static FileSystemCounterLimit fromName(final String counterName, final long limit) {
+    String counterNameStr = counterName.toUpperCase();
+    for (FSCounter fsCounter : FSCounter.values()) {
+      if (counterNameStr.endsWith(fsCounter.name())) {
+        int startIdx = counterNameStr.indexOf(fsCounter.name());
+        if (startIdx == 0) { // exact match
+          return new FileSystemCounterLimit(null, FSCounter.valueOf(counterName), limit);
+        } else {
+          String scheme = counterNameStr.substring(0, startIdx - 1);
+          // schema/counter name validation will be done in grammar as part of HIVE-17622
+          return new FileSystemCounterLimit(scheme, FSCounter.valueOf(fsCounter.name()), limit);
+        }
+      }
+    }
+
+    throw new IllegalArgumentException("Invalid counter name specified " + counterName.toUpperCase() + "");
+  }
+
+  @Override
+  public String getName() {
+    return scheme.isEmpty() ? fsCounter.name() : scheme.toUpperCase() + "_" + fsCounter.name();
+  }
+
+  @Override
+  public long getLimit() {
+    return limit;
+  }
+
+  @Override
+  public CounterLimit clone() {
+    return new FileSystemCounterLimit(scheme, fsCounter, limit);
+  }
+
+  @Override
+  public String toString() {
+    return "counter: " + getName() + " limit: " + limit;
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = 31 * scheme.hashCode();
+    hash += 31 * fsCounter.hashCode();
+    hash += 31 * limit;
+    return 31 * hash;
+  }
+
+  @Override
+  public boolean equals(final Object other) {
+    if (other == null || !(other instanceof FileSystemCounterLimit)) {
+      return false;
+    }
+
+    if (other == this) {
+      return true;
+    }
+
+    FileSystemCounterLimit otherFscl = (FileSystemCounterLimit) other;
+    return scheme.equals(otherFscl.scheme) && fsCounter.equals(otherFscl.fsCounter) && limit == otherFscl.limit;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java
new file mode 100644
index 0000000..db1a037
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.wm;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.metadata.Hive;
+
+/**
+ * Fetch global (non-llap) rules from metastore
+ */
+public class MetastoreGlobalTriggersFetcher implements TriggersFetcher {
+  public static final String GLOBAL_TRIGGER_NAME = "global";
+  private final MetastoreResourcePlanTriggersFetcher rpTriggersFetcher;
+
+  public MetastoreGlobalTriggersFetcher(final Hive db) {
+    this.rpTriggersFetcher = new MetastoreResourcePlanTriggersFetcher(db);
+  }
+
+  @Override
+  public List<Trigger> fetch(final String ignore) {
+    return fetch();
+  }
+
+  public List<Trigger> fetch() {
+    // TODO:
+    return rpTriggersFetcher.fetch(GLOBAL_TRIGGER_NAME);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreResourcePlanTriggersFetcher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreResourcePlanTriggersFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreResourcePlanTriggersFetcher.java
new file mode 100644
index 0000000..db390f2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreResourcePlanTriggersFetcher.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.wm;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.metadata.Hive;
+
+/**
+ * Fetch pool specific rules from metastore
+ */
+public class MetastoreResourcePlanTriggersFetcher implements TriggersFetcher {
+  private final Hive db;
+
+  public MetastoreResourcePlanTriggersFetcher(final Hive db) {
+    this.db = db;
+  }
+
+  @Override
+  public List<Trigger> fetch(final String resourcePlanName) {
+    // TODO: implement after integration.
+    return new ArrayList<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java
new file mode 100644
index 0000000..408aa2d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.wm;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
+
+/**
+ * Implementation for providing current open sessions and active trigger.
+ */
+public class SessionTriggerProvider {
+  private List<TezSessionState> openSessions = new ArrayList<>();
+  private List<Trigger> activeTriggers = new ArrayList<>();
+
+  public SessionTriggerProvider() {
+
+  }
+
+  public SessionTriggerProvider(final List<TezSessionState> openSessions, final List<Trigger> triggers) {
+    this.openSessions = openSessions;
+    this.activeTriggers = triggers;
+  }
+
+  public void setOpenSessions(final List<TezSessionState> openSessions) {
+    this.openSessions = openSessions;
+  }
+
+  public void setActiveTriggers(final List<Trigger> activeTriggers) {
+    this.activeTriggers = activeTriggers;
+  }
+
+  public List<TezSessionState> getOpenSessions() {
+    return Collections.unmodifiableList(openSessions);
+  }
+
+  public List<Trigger> getActiveTriggers() {
+    return Collections.unmodifiableList(activeTriggers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java
new file mode 100644
index 0000000..3c16e1d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.wm;
+
+/**
+ * Time based counters with limits
+ */
+public class TimeCounterLimit implements CounterLimit {
+  public enum TimeCounter {
+    ELAPSED_TIME,
+    EXECUTION_TIME
+  }
+
+  private TimeCounter timeCounter;
+  private long limit;
+
+  public TimeCounterLimit(final TimeCounter timeCounter, final long limit) {
+    this.timeCounter = timeCounter;
+    this.limit = limit;
+  }
+
+  @Override
+  public String getName() {
+    return timeCounter.name();
+  }
+
+  @Override
+  public long getLimit() {
+    return limit;
+  }
+
+  @Override
+  public CounterLimit clone() {
+    return new TimeCounterLimit(timeCounter, limit);
+  }
+
+  @Override
+  public String toString() {
+    return "counter: " + timeCounter.name() + " limit: " + limit;
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = 31 * timeCounter.hashCode();
+    hash += 31 * limit;
+    return 31 * hash;
+  }
+
+  @Override
+  public boolean equals(final Object other) {
+    if (other == null || !(other instanceof TimeCounterLimit)) {
+      return false;
+    }
+
+    if (other == this) {
+      return true;
+    }
+
+    TimeCounterLimit otherTcl = (TimeCounterLimit) other;
+    return timeCounter.equals(otherTcl.timeCounter) && limit == otherTcl.limit;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java
new file mode 100644
index 0000000..bed0ac1
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.wm;
+
+/**
+ * Trigger interface which gets mapped to CREATE TRIGGER .. queries. A trigger can have a name, expression and action.
+ * Trigger is a simple expression which gets evaluated during the lifecycle of query and executes an action
+ * if the expression defined in trigger evaluates to true.
+ */
+public interface Trigger {
+
+  enum Action {
+    KILL_QUERY(""),
+    MOVE_TO_POOL("");
+
+    String poolName;
+
+    Action(final String poolName) {
+      this.poolName = poolName;
+    }
+
+    public Action setPoolName(final String poolName) {
+      this.poolName = poolName;
+      return this;
+    }
+
+    public String getPoolName() {
+      return poolName;
+    }
+  }
+
+  /**
+   * Based on current value, returns true if trigger is applied else false.
+   *
+   * @param current - current value
+   * @return true if trigger got applied false otherwise
+   */
+  boolean apply(long current);
+
+  /**
+   * Get trigger expression
+   *
+   * @return expression
+   */
+  Expression getExpression();
+
+  /**
+   * Return the name of the trigger
+   *
+   * @return trigger name
+   */
+  String getName();
+
+  /**
+   * Return the action that will get executed when trigger expression evaluates to true
+   *
+   * @return action
+   */
+  Action getAction();
+
+  /**
+   * Return cloned copy of this trigger
+   *
+   * @return clone copy
+   */
+  Trigger clone();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java
new file mode 100644
index 0000000..5cd24d5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.wm;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
+
+/**
+ * Interface for handling rule violations by queries and for performing actions defined in the rules.
+ */
+public interface TriggerActionHandler {
+  /**
+   * Applies the action defined in the rule for the specified queries
+   *
+   * @param queriesViolated - violated queries and the rule it violated
+   */
+  void applyAction(Map<TezSessionState, Trigger.Action> queriesViolated);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java
new file mode 100644
index 0000000..6146fc0
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.wm;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hive.ql.QueryInfo;
+
+/**
+ * Some context information that are required for rule evaluation.
+ */
+public class TriggerContext {
+  private Set<String> desiredCounters = new HashSet<>();
+  private Map<String, Long> currentCounters = new HashMap<>();
+  private String queryId;
+  private long queryStartTime;
+
+  public TriggerContext(final long queryStartTime, final String queryId) {
+    this.queryStartTime = queryStartTime;
+    this.queryId = queryId;
+  }
+
+  public String getQueryId() {
+    return queryId;
+  }
+
+  public void setQueryId(final String queryId) {
+    this.queryId = queryId;
+  }
+
+  public long getQueryStartTime() {
+    return queryStartTime;
+  }
+
+  public void setQueryStartTime(final long queryStartTime) {
+    this.queryStartTime = queryStartTime;
+  }
+
+  public Set<String> getDesiredCounters() {
+    return desiredCounters;
+  }
+
+  public void setDesiredCounters(final Set<String> desiredCounters) {
+    this.desiredCounters = desiredCounters;
+  }
+
+  public Map<String, Long> getCurrentCounters() {
+    return currentCounters;
+  }
+
+  public void setCurrentCounters(final Map<String, Long> currentCounters) {
+    this.currentCounters = currentCounters;
+  }
+
+  public long getElapsedTime() {
+    return System.currentTimeMillis() - queryStartTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java
new file mode 100644
index 0000000..065ab79
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.wm;
+
+import java.util.Objects;
+
+/**
+ * Simple trigger expression for a rule.
+ */
+public class TriggerExpression implements Expression {
+  private CounterLimit counterLimit;
+  private Predicate predicate;
+
+  public TriggerExpression(final CounterLimit counter, final Predicate predicate) {
+    this.counterLimit = counter;
+    this.predicate = predicate;
+  }
+
+  @Override
+  public boolean evaluate(final long current) {
+    if (counterLimit.getLimit() > 0) {
+      if (predicate.equals(Predicate.GREATER_THAN)) {
+        return current > counterLimit.getLimit();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public CounterLimit getCounterLimit() {
+    return counterLimit;
+  }
+
+  @Override
+  public Predicate getPredicate() {
+    return predicate;
+  }
+
+  @Override
+  public Expression clone() {
+    return new TriggerExpression(counterLimit.clone(), predicate);
+  }
+
+  @Override
+  public String toString() {
+    return counterLimit.getName() + " " + predicate.getSymbol() + " " + counterLimit.getLimit();
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = counterLimit == null ? 31 : 31 * counterLimit.hashCode();
+    hash += predicate == null ? 31 * hash : 31 * hash * predicate.hashCode();
+    return 31 * hash;
+  }
+
+  @Override
+  public boolean equals(final Object other) {
+    if (other == null || !(other instanceof TriggerExpression)) {
+      return false;
+    }
+
+    if (other == this) {
+      return true;
+    }
+
+    return Objects.equals(counterLimit, ((TriggerExpression) other).counterLimit) &&
+      Objects.equals(predicate, ((TriggerExpression) other).predicate);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggersFetcher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggersFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggersFetcher.java
new file mode 100644
index 0000000..c25ea3c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggersFetcher.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.wm;
+
+import java.util.List;
+
+/**
+ * Interface to fetch rules
+ */
+public interface TriggersFetcher {
+  List<Trigger> fetch(final String resourcePlanName);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/ql/src/java/org/apache/hadoop/hive/ql/wm/VertexCounterLimit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/VertexCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/VertexCounterLimit.java
new file mode 100644
index 0000000..dd19ce6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/VertexCounterLimit.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.wm;
+
+/**
+ * Vertex specific counters with limits
+ */
+public class VertexCounterLimit implements CounterLimit {
+  public enum VertexCounter {
+    TOTAL_TASKS
+  }
+
+  private VertexCounter vertexCounter;
+  private long limit;
+
+  public VertexCounterLimit(final VertexCounter vertexCounter, final long limit) {
+    this.vertexCounter = vertexCounter;
+    this.limit = limit;
+  }
+
+  @Override
+  public String getName() {
+    return vertexCounter.name();
+  }
+
+  @Override
+  public long getLimit() {
+    return limit;
+  }
+
+  @Override
+  public CounterLimit clone() {
+    return new VertexCounterLimit(vertexCounter, limit);
+  }
+
+  @Override
+  public String toString() {
+    return "counter: " + vertexCounter.name() + " limit: " + limit;
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = 31 * vertexCounter.hashCode();
+    hash += 31 * limit;
+    return 31 * hash;
+  }
+
+  @Override
+  public boolean equals(final Object other) {
+    if (other == null || !(other instanceof VertexCounterLimit)) {
+      return false;
+    }
+
+    if (other == this) {
+      return true;
+    }
+
+    VertexCounterLimit otherVcl = (VertexCounterLimit) other;
+    return vertexCounter.equals(otherVcl.vertexCounter) && limit == otherVcl.limit;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index 05eb761..829ea8c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -48,7 +48,7 @@ public class TestTezSessionPool {
     }
 
     @Override
-    public void setupPool(HiveConf conf) throws InterruptedException {
+    public void setupPool(HiveConf conf) throws Exception {
       conf.setVar(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME, "");
       super.setupPool(conf);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 7adf895..17c62cf 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -18,19 +18,27 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.tez.dag.api.TezConfiguration;
-
-import java.util.List;
-
 import org.junit.Test;
 
 public class TestWorkloadManager {
-  private static class MockQam implements QueryAllocationManager {
+  public static class MockQam implements QueryAllocationManager {
     boolean isCalled = false;
 
     @Override
@@ -52,10 +60,10 @@ public class TestWorkloadManager {
     }
   }
 
-  private static class WorkloadManagerForTest extends WorkloadManager {
+  public static class WorkloadManagerForTest extends WorkloadManager {
 
-    WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions,
-        QueryAllocationManager qam) {
+    public WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions,
+      QueryAllocationManager qam) {
       super(yarnQueue, conf, numSessions, qam, null);
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java
new file mode 100644
index 0000000..ce1dc6e
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.wm;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ *
+ */
+public class TestTrigger {
+  @org.junit.Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testSimpleQueryTrigger() {
+    Expression expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs",
+      FileSystemCounterLimit.FSCounter.BYTES_READ, 1024));
+    Trigger trigger = new ExecutionTrigger("hdfs_read_heavy", expression, Trigger.Action.KILL_QUERY);
+    assertEquals("counter: HDFS_BYTES_READ limit: 1024", expression.getCounterLimit().toString());
+    assertFalse(trigger.apply(1000));
+    assertTrue(trigger.apply(1025));
+
+    expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs",
+      FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024));
+    trigger = new ExecutionTrigger("hdfs_write_heavy", expression, Trigger.Action.KILL_QUERY);
+    assertEquals("counter: HDFS_BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString());
+    assertFalse(trigger.apply(1000));
+    assertTrue(trigger.apply(1025));
+
+    expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("",
+      FileSystemCounterLimit.FSCounter.BYTES_READ, 1024));
+    trigger = new ExecutionTrigger("local_read_heavy", expression, Trigger.Action.KILL_QUERY);
+    assertEquals("counter: BYTES_READ limit: 1024", expression.getCounterLimit().toString());
+    assertFalse(trigger.apply(1000));
+    assertTrue(trigger.apply(1025));
+
+    expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("",
+      FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024));
+    trigger = new ExecutionTrigger("local_write_heavy", expression, Trigger.Action.KILL_QUERY);
+    assertEquals("counter: BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString());
+    assertFalse(trigger.apply(1000));
+    assertTrue(trigger.apply(1025));
+
+    expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("",
+      FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024));
+    trigger = new ExecutionTrigger("shuffle_heavy", expression, Trigger.Action.KILL_QUERY);
+    assertEquals("counter: SHUFFLE_BYTES limit: 1024", expression.getCounterLimit().toString());
+    assertFalse(trigger.apply(1000));
+    assertTrue(trigger.apply(1025));
+
+    expression = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter
+      .EXECUTION_TIME, 10000));
+    trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.MOVE_TO_POOL.setPoolName("fake_pool"));
+    assertEquals("counter: EXECUTION_TIME limit: 10000", expression.getCounterLimit().toString());
+    assertFalse(trigger.apply(1000));
+    assertTrue(trigger.apply(100000));
+
+    expression = ExpressionFactory.createExpression(new VertexCounterLimit(VertexCounterLimit.VertexCounter
+      .TOTAL_TASKS,10000));
+    trigger = new ExecutionTrigger("highly_parallel", expression, Trigger.Action.KILL_QUERY);
+    assertEquals("counter: TOTAL_TASKS limit: 10000", expression.getCounterLimit().toString());
+    assertFalse(trigger.apply(1000));
+    assertTrue(trigger.apply(100000));
+  }
+
+  @Test
+  public void testExpressionFromString() {
+    Expression expression = ExpressionFactory.fromString("BYTES_READ>1024");
+    Expression expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("",
+      FileSystemCounterLimit.FSCounter.BYTES_READ, 1024));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+    expression = ExpressionFactory.fromString("BYTES_READ > 1024");
+    assertEquals(expected, expression);
+
+    expression = ExpressionFactory.fromString(expected.toString());
+    assertEquals(expected.toString(), expression.toString());
+
+    assertEquals(expected.hashCode(), expression.hashCode());
+    expression = ExpressionFactory.fromString("  BYTES_READ   >   1024  ");
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString("BYTES_WRITTEN > 1024");
+    expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("",
+      FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" HDFS_BYTES_READ > 1024 ");
+    expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs",
+      FileSystemCounterLimit.FSCounter.BYTES_READ, 1024));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" HDFS_BYTES_WRITTEN > 1024");
+    expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs",
+      FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" S3A_BYTES_READ > 1024");
+    expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a",
+      FileSystemCounterLimit.FSCounter.BYTES_READ, 1024));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" S3A_BYTES_WRITTEN > 1024");
+    expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a",
+      FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" s3a_ByTeS_WRiTTeN > 1024");
+    expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a",
+      FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 1024");
+    expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("",
+      FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" EXECUTION_TIME > 300");
+    expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter
+      .EXECUTION_TIME, 300));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" ELAPSED_TIME > 300");
+    expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter
+      .ELAPSED_TIME, 300));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" elapsed_TIME > 300");
+    expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter
+      .ELAPSED_TIME, 300));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" TOTAL_TASKS > 10000");
+    expected = ExpressionFactory.createExpression(new VertexCounterLimit(VertexCounterLimit.VertexCounter
+      .TOTAL_TASKS,10000));
+    assertEquals("counter: TOTAL_TASKS limit: 10000", expression.getCounterLimit().toString());
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+  }
+
+  @Test
+  public void testSizeValidationInTrigger() {
+    Expression expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 100MB");
+    Expression expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("",
+      FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 100 * 1024 * 1024));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 1 gB");
+    expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("",
+      FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024 * 1024 * 1024));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 1 TB");
+    expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("",
+      FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024L * 1024 * 1024 * 1024));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 100 B");
+    expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("",
+      FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 100));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 100bytes");
+    expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("",
+      FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 100));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+  }
+
+  @Test
+  public void testIllegalSizeCounterValue1() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid size unit");
+    ExpressionFactory.fromString(" SHUFFLE_BYTES > 300GiB");
+  }
+
+  @Test
+  public void testIllegalSizeCounterValue2() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid size unit");
+    ExpressionFactory.fromString(" SHUFFLE_BYTES > 300 foo");
+  }
+
+  @Test
+  public void testTimeValidationInTrigger() {
+    Expression expression = ExpressionFactory.fromString(" elapsed_TIME > 300 s");
+    Expression expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter
+      .ELAPSED_TIME, 300000));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" elapsed_TIME > 300 seconds");
+    expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter
+      .ELAPSED_TIME, 300000));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" elapsed_TIME > 300 sec");
+    expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter
+      .ELAPSED_TIME, 300000));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" elapsed_TIME > 300s");
+    expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter
+      .ELAPSED_TIME, 300000));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" elapsed_TIME > 300seconds");
+    expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter
+      .ELAPSED_TIME, 300000));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" elapsed_TIME > 300sec");
+    expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter
+      .ELAPSED_TIME, 300000));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" elapsed_TIME > 300000000 microseconds");
+    expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter
+      .ELAPSED_TIME, 300000));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+
+    expression = ExpressionFactory.fromString(" elapsed_TIME > 1DAY");
+    expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter
+      .ELAPSED_TIME, 24 * 60 * 60 * 1000));
+    assertEquals(expected, expression);
+    assertEquals(expected.hashCode(), expression.hashCode());
+  }
+
+  @Test
+  public void testIllegalTimeCounterValue1() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid time unit");
+    ExpressionFactory.fromString(" elapsed_TIME > 300 light years");
+  }
+
+  @Test
+  public void testIllegalTimeCounterValue2() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid time unit");
+    ExpressionFactory.fromString(" elapsed_TIME > 300secTOR");
+  }
+
+  @Test
+  public void testTriggerClone() {
+    Expression expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs",
+      FileSystemCounterLimit.FSCounter.BYTES_READ, 1024));
+    Trigger trigger = new ExecutionTrigger("hdfs_read_heavy", expression, Trigger.Action.KILL_QUERY);
+    Trigger clonedTrigger = trigger.clone();
+    assertNotEquals(System.identityHashCode(trigger), System.identityHashCode(clonedTrigger));
+    assertNotEquals(System.identityHashCode(trigger.getExpression()), System.identityHashCode(clonedTrigger.getExpression()));
+    assertNotEquals(System.identityHashCode(trigger.getExpression().getCounterLimit()),
+      System.identityHashCode(clonedTrigger.getExpression().getCounterLimit()));
+    assertEquals(trigger, clonedTrigger);
+    assertEquals(trigger.hashCode(), clonedTrigger.hashCode());
+
+    expression = ExpressionFactory.fromString(" ELAPSED_TIME > 300");
+    trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY);
+    clonedTrigger = trigger.clone();
+    assertNotEquals(System.identityHashCode(trigger), System.identityHashCode(clonedTrigger));
+    assertNotEquals(System.identityHashCode(trigger.getExpression()), System.identityHashCode(clonedTrigger.getExpression()));
+    assertNotEquals(System.identityHashCode(trigger.getExpression().getCounterLimit()),
+      System.identityHashCode(clonedTrigger.getExpression().getCounterLimit()));
+    assertEquals(trigger, clonedTrigger);
+    assertEquals(trigger.hashCode(), clonedTrigger.hashCode());
+  }
+
+  @Test
+  public void testIllegalExpressionsUnsupportedPredicate() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid predicate in expression");
+    ExpressionFactory.fromString("BYTES_READ < 1024");
+  }
+
+  @Test
+  public void testIllegalExpressionsMissingLimit() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid predicate in expression");
+    ExpressionFactory.fromString("BYTES_READ >");
+  }
+
+  @Test
+  public void testIllegalExpressionsMissingCounter() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Counter name cannot be empty!");
+    ExpressionFactory.fromString("> 1024");
+  }
+
+  @Test
+  public void testIllegalExpressionsMultipleLimit() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid predicate in expression");
+    ExpressionFactory.fromString("BYTES_READ > 1024 > 1025");
+  }
+
+  @Test
+  public void testIllegalExpressionsMultipleCounters() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid predicate in expression");
+    ExpressionFactory.fromString("BYTES_READ > BYTES_READ > 1025");
+  }
+
+  @Test
+  public void testIllegalExpressionsInvalidLimitPost() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid size unit");
+    ExpressionFactory.fromString("BYTES_READ > 1024aaaa");
+  }
+
+  @Test
+  public void testIllegalExpressionsInvalidLimitPre() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid counter value");
+    ExpressionFactory.fromString("BYTES_READ > foo1024");
+  }
+
+  @Test
+  public void testIllegalExpressionsInvalidNegativeLimit() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Illegal value for counter limit. Expected a positive long value.");
+    ExpressionFactory.fromString("BYTES_READ > -1024");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
index 1cf4392..8224bcc 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -210,7 +210,9 @@ public class OperationManager extends AbstractService {
 
   private Operation removeOperation(OperationHandle opHandle) {
     Operation operation = handleToOperation.remove(opHandle);
-    queryIdOperation.remove(getQueryId(operation));
+    String queryId = getQueryId(operation);
+    queryIdOperation.remove(queryId);
+    LOG.info("Removed queryId: {} corresponding to operation: {}", queryId, opHandle);
     if (operation instanceof SQLOperation) {
       removeSafeQueryInfo(opHandle);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/7d6a5119/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index ec6657d..b121a06 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -112,6 +112,7 @@ public class HiveServer2 extends CompositeService {
   private CuratorFramework zooKeeperClient;
   private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens
   private HttpServer webServer; // Web UI
+  private TezSessionPoolManager tezSessionPoolManager;
   private WorkloadManager wm;
 
   public HiveServer2() {
@@ -126,6 +127,11 @@ public class HiveServer2 extends CompositeService {
       if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
         MetricsFactory.init(hiveConf);
       }
+
+      if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
+        tezSessionPoolManager = TezSessionPoolManager.getInstance();
+        tezSessionPoolManager.setupPool(hiveConf);
+      }
     } catch (Throwable t) {
       LOG.warn("Could not initiate the HiveServer2 Metrics system.  Metrics may not be reported.", t);
     }
@@ -528,6 +534,24 @@ public class HiveServer2 extends CompositeService {
         throw new ServiceException(e);
       }
     }
+    if (tezSessionPoolManager != null) {
+      try {
+        tezSessionPoolManager.startPool();
+        LOG.info("Started tez session pool manager..");
+      } catch (Exception e) {
+        LOG.error("Error starting tez session pool manager: ", e);
+        throw new ServiceException(e);
+      }
+    }
+    if (wm != null) {
+      try {
+        wm.start();
+        LOG.info("Started workload manager..");
+      } catch (Exception e) {
+        LOG.error("Error starting workload manager", e);
+        throw new ServiceException(e);
+      }
+    }
   }
 
   @Override
@@ -562,9 +586,10 @@ public class HiveServer2 extends CompositeService {
     }
     // There should already be an instance of the session pool manager.
     // If not, ignoring is fine while stopping HiveServer2.
-    if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
+    if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS) &&
+      tezSessionPoolManager != null) {
       try {
-        TezSessionPoolManager.getInstance().stop();
+        tezSessionPoolManager.stop();
       } catch (Exception e) {
         LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. "
             + "Shutting down HiveServer2 anyway.", e);
@@ -614,13 +639,6 @@ public class HiveServer2 extends CompositeService {
               TimeUnit.MILLISECONDS);
       HiveServer2 server = null;
       try {
-        // Initialize the pool before we start the server; don't start yet.
-        TezSessionPoolManager sessionPool = null;
-        if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
-          sessionPool = TezSessionPoolManager.getInstance();
-          sessionPool.setupPool(hiveConf);
-        }
-
         // Cleanup the scratch dir before starting
         ServerUtils.cleanUpScratchDir(hiveConf);
         // Schedule task to cleanup dangling scratch dir periodically,
@@ -640,13 +658,6 @@ public class HiveServer2 extends CompositeService {
             "warned upon.", t);
         }
 
-        if (sessionPool != null) {
-          sessionPool.startPool();
-        }
-        if (server.wm != null) {
-          server.wm.start();
-        }
-
         if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
           SparkSessionManagerImpl.getInstance().setup(hiveConf);
         }


Mime
View raw message