Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F2D911806C for ; Thu, 25 Jun 2015 23:46:56 +0000 (UTC) Received: (qmail 65501 invoked by uid 500); 25 Jun 2015 23:46:56 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 65277 invoked by uid 500); 25 Jun 2015 23:46:56 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 65256 invoked by uid 99); 25 Jun 2015 23:46:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Jun 2015 23:46:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 90AE0E36B4; Thu, 25 Jun 2015 23:46:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbertozzi@apache.org To: commits@hbase.apache.org Date: Thu, 25 Jun 2015 23:46:57 -0000 Message-Id: <8a24b0699c3a4d4b9cdfad93909a6fd7@git.apache.org> In-Reply-To: <1d4adc8ed9914ddbb7881842215c9b85@git.apache.org> References: <1d4adc8ed9914ddbb7881842215c9b85@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] hbase git commit: HBASE-13950 Add a NoopProcedureStore for testing HBASE-13950 Add a NoopProcedureStore for testing Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/70f0ca3c Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/70f0ca3c Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/70f0ca3c Branch: refs/heads/master Commit: 70f0ca3c6d485d504d49652801fd841de2086157 Parents: db5dd1e Author: Matteo Bertozzi Authored: Thu Jun 25 16:29:57 2015 -0700 Committer: Matteo Bertozzi Committed: Thu Jun 25 16:44:58 2015 -0700 ---------------------------------------------------------------------- .../procedure2/store/NoopProcedureStore.java | 73 ++++++++++++++++++++ .../procedure2/store/ProcedureStoreBase.java | 66 ++++++++++++++++++ .../procedure2/store/wal/WALProcedureStore.java | 44 +++--------- .../procedure2/ProcedureTestingUtility.java | 15 ++++ 4 files changed, 162 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/70f0ca3c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java new file mode 100644 index 0000000..62448fb --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.store; + +import java.io.IOException; + +import org.apache.hadoop.hbase.procedure2.Procedure; + +/** + * An In-Memory store that does not keep track of the procedures inserted. + */ +public class NoopProcedureStore extends ProcedureStoreBase { + private int numThreads; + + @Override + public void start(int numThreads) throws IOException { + if (!setRunning(true)) { + return; + } + this.numThreads = numThreads; + } + + @Override + public void stop(boolean abort) { + setRunning(false); + } + + @Override + public void recoverLease() throws IOException { + // no-op + } + + @Override + public int getNumThreads() { + return numThreads; + } + + @Override + public void load(final ProcedureLoader loader) throws IOException { + loader.setMaxProcId(0); + } + + @Override + public void insert(Procedure proc, Procedure[] subprocs) { + // no-op + } + + @Override + public void update(Procedure proc) { + // no-op + } + + @Override + public void delete(long procId) { + // no-op + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/70f0ca3c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java new file mode 100644 index 0000000..e5653b6 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.store; + +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Base class for {@link ProcedureStore}s. + */ +public abstract class ProcedureStoreBase implements ProcedureStore { + private final CopyOnWriteArrayList listeners = + new CopyOnWriteArrayList(); + + private final AtomicBoolean running = new AtomicBoolean(false); + + /** + * Change the state to 'isRunning', + * returns true if the store state was changed, + * false if the store was already in that state. + * @param isRunning the state to set. + * @return true if the store state was changed, otherwise false. + */ + protected boolean setRunning(boolean isRunning) { + return running.getAndSet(isRunning) != isRunning; + } + + @Override + public boolean isRunning() { + return running.get(); + } + + @Override + public void registerListener(ProcedureStoreListener listener) { + listeners.add(listener); + } + + @Override + public boolean unregisterListener(ProcedureStoreListener listener) { + return listeners.remove(listener); + } + + protected void sendAbortProcessSignal() { + if (!this.listeners.isEmpty()) { + for (ProcedureStoreListener listener : this.listeners) { + listener.abortProcess(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/70f0ca3c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index f4a52b1..54b53dc 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.LinkedTransferQueue; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.Arrays; import java.util.ArrayList; @@ -47,7 +46,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.procedure2.util.ByteSlot; import org.apache.hadoop.hbase.procedure2.util.StringUtils; @@ -58,7 +57,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHe */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class WALProcedureStore implements ProcedureStore { +public class WALProcedureStore extends ProcedureStoreBase { private static final Log LOG = LogFactory.getLog(WALProcedureStore.class); public interface LeaseRecovery { @@ -76,12 +75,8 @@ public class WALProcedureStore implements ProcedureStore { private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold"; private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M - private final CopyOnWriteArrayList listeners = - new CopyOnWriteArrayList(); - private final LinkedList logs = new LinkedList(); private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker(); - private final AtomicBoolean running = new AtomicBoolean(false); private final ReentrantLock lock = new ReentrantLock(); private final Condition waitCond = lock.newCondition(); private final Condition slotCond = lock.newCondition(); @@ -117,7 +112,7 @@ public class WALProcedureStore implements ProcedureStore { @Override public void start(int numSlots) throws IOException { - if (running.getAndSet(true)) { + if (!setRunning(true)) { return; } @@ -137,7 +132,7 @@ public class WALProcedureStore implements ProcedureStore { syncThread = new Thread("WALProcedureStoreSyncThread") { @Override public void run() { - while (running.get()) { + while (isRunning()) { try { syncLoop(); } catch (IOException e) { @@ -152,7 +147,7 @@ public class WALProcedureStore implements ProcedureStore { @Override public void stop(boolean abort) { - if (!running.getAndSet(false)) { + if (!setRunning(false)) { return; } @@ -186,11 +181,6 @@ public class WALProcedureStore implements ProcedureStore { } @Override - public boolean isRunning() { - return running.get(); - } - - @Override public int getNumThreads() { return slots == null ? 0 : slots.length; } @@ -200,20 +190,10 @@ public class WALProcedureStore implements ProcedureStore { } @Override - public void registerListener(ProcedureStoreListener listener) { - this.listeners.add(listener); - } - - @Override - public boolean unregisterListener(ProcedureStoreListener listener) { - return this.listeners.remove(listener); - } - - @Override public void recoverLease() throws IOException { LOG.info("Starting WAL Procedure Store lease recovery"); FileStatus[] oldLogs = getLogFiles(); - while (running.get()) { + while (isRunning()) { // Get Log-MaxID and recover lease on old logs flushLogId = initOldLogs(oldLogs); @@ -462,7 +442,7 @@ public class WALProcedureStore implements ProcedureStore { private void syncLoop() throws IOException { inSync.set(false); - while (running.get()) { + while (isRunning()) { lock.lock(); try { // Wait until new data is available @@ -522,7 +502,7 @@ public class WALProcedureStore implements ProcedureStore { sendAbortProcessSignal(); } } - } while (running.get()); + } while (isRunning()); return totalSynced; } @@ -548,14 +528,6 @@ public class WALProcedureStore implements ProcedureStore { return totalSynced; } - private void sendAbortProcessSignal() { - if (!this.listeners.isEmpty()) { - for (ProcedureStoreListener listener : this.listeners) { - listener.abortProcess(); - } - } - } - private boolean rollWriterOrDie() { try { return rollWriter(); http://git-wip-us.apache.org/repos/asf/hbase/blob/70f0ca3c/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index ddea9d2..a90e056 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import static org.junit.Assert.assertEquals; @@ -109,6 +110,20 @@ public class ProcedureTestingUtility { ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value); } + public static long submitAndWait(Configuration conf, TEnv env, Procedure proc) + throws IOException { + NoopProcedureStore procStore = new NoopProcedureStore(); + ProcedureExecutor procExecutor = new ProcedureExecutor(conf, env, procStore); + procStore.start(1); + procExecutor.start(1, false); + try { + return submitAndWait(procExecutor, proc); + } finally { + procStore.stop(false); + procExecutor.stop(); + } + } + public static long submitAndWait(ProcedureExecutor procExecutor, Procedure proc) { long procId = procExecutor.submitProcedure(proc); waitProcedure(procExecutor, procId);