hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ekoif...@apache.org
Subject hive git commit: HIVE-12353 When Compactor fails it calls CompactionTxnHandler.markedCleaned(). it should not. (Eugene Koifman, reviewed by Alan Gates) (ADDENDUM)
Date Fri, 22 Jan 2016 02:40:08 GMT
Repository: hive
Updated Branches:
  refs/heads/branch-2.0 e8388ae67 -> 0ffef3f63


HIVE-12353 When Compactor fails it calls CompactionTxnHandler.markedCleaned().  it should
not. (Eugene Koifman, reviewed by Alan Gates) (ADDENDUM)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0ffef3f6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0ffef3f6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0ffef3f6

Branch: refs/heads/branch-2.0
Commit: 0ffef3f63aeadc3eabf6df4a1e3bf7c5ca1fc7ff
Parents: e8388ae
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Thu Jan 21 18:39:57 2016 -0800
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Thu Jan 21 18:39:57 2016 -0800

----------------------------------------------------------------------
 .../ql/txn/AcidCompactionHistoryService.java    | 83 ++++++++++++++++++
 .../txn/compactor/HouseKeeperServiceBase.java   | 92 ++++++++++++++++++++
 2 files changed, 175 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0ffef3f6/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
new file mode 100644
index 0000000..a91ca5c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Purges obsolete items from compaction history data
+ */
+public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
+  private static final Logger LOG = LoggerFactory.getLogger(AcidCompactionHistoryService.class);
+
+  @Override
+  protected long getStartDelayMs() {
+    return 0;
+  }
+  @Override
+  protected long getIntervalMs() {
+    return hiveConf.getTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, TimeUnit.MILLISECONDS);
+  }
+  @Override
+  protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+    return new ObsoleteEntryReaper(hiveConf, isAliveCounter);
+  }
+
+  @Override
+  public String getServiceDescription() {
+    return "Removes obsolete entries from Compaction History";
+  }
+  
+  private static final class ObsoleteEntryReaper implements Runnable {
+    private final CompactionTxnHandler txnHandler;
+    private final AtomicInteger isAliveCounter;
+    private ObsoleteEntryReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+      txnHandler = new CompactionTxnHandler(hiveConf);
+      this.isAliveCounter = isAliveCounter;
+    }
+    
+    @Override
+    public void run() {
+      try {
+        long startTime = System.currentTimeMillis();
+        txnHandler.purgeCompactionHistory();
+        int count = isAliveCounter.incrementAndGet(); 
+        LOG.info("History reaper reaper ran for " + (System.currentTimeMillis() - startTime)/1000
+ "seconds.  isAliveCounter=" + count);
+      }
+      catch(Throwable t) {
+        LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(),
t);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffef3f6/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
new file mode 100644
index 0000000..947f17c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HouseKeeperService;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class HouseKeeperServiceBase implements HouseKeeperService {
+  private static final Logger LOG = LoggerFactory.getLogger(HouseKeeperServiceBase.class);
+  private ScheduledExecutorService pool = null;
+  protected final AtomicInteger isAliveCounter = new AtomicInteger(Integer.MIN_VALUE);
+  protected HiveConf hiveConf;
+
+  @Override
+  public void start(HiveConf hiveConf) throws Exception {
+    this.hiveConf = hiveConf;
+    HiveTxnManager mgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf);
+    if(!mgr.supportsAcid()) {
+      LOG.info(this.getClass().getName() + " not started since " +
+        mgr.getClass().getName()  + " does not support Acid.");
+      return;//there are no transactions in this case
+    }
+    pool = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+      private final AtomicInteger threadCounter = new AtomicInteger();
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, this.getClass().getName() + "-" + threadCounter.getAndIncrement());
+      }
+    });
+
+    TimeUnit tu = TimeUnit.MILLISECONDS;
+    pool.scheduleAtFixedRate(getScheduedAction(hiveConf, isAliveCounter), getStartDelayMs(),
+      getIntervalMs(), tu);
+    LOG.info("Started " + this.getClass().getName() + " with delay/interval = " + getStartDelayMs()
+ "/" +
+      getIntervalMs() + " " + tu);
+  }
+
+  @Override
+  public void stop() {
+    if(pool != null && !pool.isShutdown()) {
+      pool.shutdown();
+    }
+    pool = null;
+  }
+
+  /**
+   * This is used for testing only.  Each time the housekeeper runs, counter is incremented
by 1.
+   * Starts with {@link java.lang.Integer#MIN_VALUE}
+   */
+  @Override
+  public int getIsAliveCounter() {
+    return isAliveCounter.get();
+  }
+
+  /**
+   * Delay in millis before first run of the task of this service.
+   */
+  protected abstract long getStartDelayMs();
+  /**
+   * Determines how fequently the service is running its task.
+   */
+  protected abstract long getIntervalMs();
+
+  /**
+   * The actual task implementation.  Must increment the counter on each iteration.
+   */
+  protected abstract Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter);
+}


Mime
View raw message