Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3EC8C200D3B for ; Thu, 26 Oct 2017 21:41:21 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3D634160BF3; Thu, 26 Oct 2017 19:41:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 32D8F1609E5 for ; Thu, 26 Oct 2017 21:41:20 +0200 (CEST) Received: (qmail 53913 invoked by uid 500); 26 Oct 2017 19:41:19 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 53900 invoked by uid 99); 26 Oct 2017 19:41:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Oct 2017 19:41:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 69D0ADFC64; Thu, 26 Oct 2017 19:41:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Thu, 26 Oct 2017 19:41:19 -0000 Message-Id: <730febb625a246c191029ee797d72966@git.apache.org> In-Reply-To: <0355c54eb60049b3a879348698738fab@git.apache.org> References: <0355c54eb60049b3a879348698738fab@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/4] activemq-artemis git commit: ARTEMIS-1447 JDBC NodeManager to support JDBC HA Shared Store archived-at: Thu, 26 Oct 2017 19:41:21 -0000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java new file mode 100644 index 0000000..136f5db --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.impl.jdbc; + +import java.sql.SQLException; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class JdbcLeaseLockTest { + + private static final long DEFAULT_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(10); + private static final SQLProvider SQL_PROVIDER = new DerbySQLProvider.Factory().create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER); + private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true"; + private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; + private JdbcSharedStateManager jdbcSharedStateManager; + + private LeaseLock lock() { + return lock(DEFAULT_LOCK_EXPIRATION_MILLIS); + } + + private LeaseLock lock(long acquireMillis) { + try { + return JdbcSharedStateManager.createLiveLock(UUID.randomUUID().toString(), jdbcSharedStateManager.getConnection(), SQL_PROVIDER, acquireMillis, 0); + } catch (SQLException e) { + throw new IllegalStateException(e); + } + } + + @Before + public void createLockTable() { + jdbcSharedStateManager = JdbcSharedStateManager.usingConnectionUrl(UUID.randomUUID().toString(), DEFAULT_LOCK_EXPIRATION_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER); + } + + @After + public void dropLockTable() throws Exception { + jdbcSharedStateManager.destroy(); + jdbcSharedStateManager.close(); + } + + @Test + public void shouldAcquireLock() { + final LeaseLock lock = lock(); + final boolean acquired = lock.tryAcquire(); + Assert.assertTrue("Must acquire the lock!", acquired); + try { + Assert.assertTrue("The lock is been held by the caller!", lock.isHeldByCaller()); + } finally { + lock.release(); + } + } + + @Test + public void shouldNotAcquireLockWhenAlreadyHeldByOthers() { + final LeaseLock lock = lock(); + Assert.assertTrue("Must acquire the lock", lock.tryAcquire()); + try { + Assert.assertTrue("Lock held by the caller", lock.isHeldByCaller()); + final LeaseLock failingLock = lock(); + Assert.assertFalse("lock already held by other", failingLock.tryAcquire()); + Assert.assertFalse("lock already held by other", failingLock.isHeldByCaller()); + Assert.assertTrue("lock already held by other", failingLock.isHeld()); + } finally { + lock.release(); + } + } + + @Test + public void shouldNotAcquireLockTwice() { + final LeaseLock lock = lock(); + Assert.assertTrue("Must acquire the lock", lock.tryAcquire()); + try { + Assert.assertFalse("lock already acquired", lock.tryAcquire()); + } finally { + lock.release(); + } + } + + @Test + public void shouldNotCorruptGuardedState() throws InterruptedException { + final AtomicLong sharedState = new AtomicLong(0); + final int producers = 2; + final int writesPerProducer = 10; + final long idleMillis = 1000; + final long millisToAcquireLock = writesPerProducer * (producers - 1) * idleMillis; + final LeaseLock.Pauser pauser = LeaseLock.Pauser.sleep(idleMillis, TimeUnit.MILLISECONDS); + final CountDownLatch finished = new CountDownLatch(producers); + final LeaseLock[] locks = new LeaseLock[producers]; + final AtomicInteger lockIndex = new AtomicInteger(0); + final Runnable producerTask = () -> { + final LeaseLock lock = locks[lockIndex.getAndIncrement()]; + try { + for (int i = 0; i < writesPerProducer; i++) { + final LeaseLock.AcquireResult acquireResult = lock.tryAcquire(millisToAcquireLock, pauser, () -> true); + if (acquireResult != LeaseLock.AcquireResult.Done) { + throw new IllegalStateException(acquireResult + " from " + Thread.currentThread()); + } + //avoid the atomic getAndIncrement operation on purpose + sharedState.lazySet(sharedState.get() + 1); + lock.release(); + } + } finally { + finished.countDown(); + } + }; + final Thread[] producerThreads = new Thread[producers]; + for (int i = 0; i < producers; i++) { + locks[i] = lock(); + producerThreads[i] = new Thread(producerTask); + } + Stream.of(producerThreads).forEach(Thread::start); + final long maxTestTime = millisToAcquireLock * writesPerProducer * producers; + Assert.assertTrue("Each producers must complete the writes", finished.await(maxTestTime, TimeUnit.MILLISECONDS)); + Assert.assertEquals("locks hasn't mutual excluded producers", writesPerProducer * producers, sharedState.get()); + } + + @Test + public void shouldAcquireExpiredLock() throws InterruptedException { + final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1)); + Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); + try { + Thread.sleep(lock.expirationMillis() * 2); + Assert.assertFalse("lock is already expired", lock.isHeldByCaller()); + Assert.assertFalse("lock is already expired", lock.isHeld()); + Assert.assertTrue("lock is already expired", lock.tryAcquire()); + } finally { + lock.release(); + } + } + + @Test + public void shouldOtherAcquireExpiredLock() throws InterruptedException { + final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1)); + Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); + try { + Thread.sleep(lock.expirationMillis() * 2); + Assert.assertFalse("lock is already expired", lock.isHeldByCaller()); + Assert.assertFalse("lock is already expired", lock.isHeld()); + final LeaseLock otherLock = lock(TimeUnit.SECONDS.toMillis(10)); + try { + Assert.assertTrue("lock is already expired", otherLock.tryAcquire()); + } finally { + otherLock.release(); + } + } finally { + lock.release(); + } + } + + @Test + public void shouldRenewAcquiredLock() throws InterruptedException { + final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(10)); + Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); + try { + Assert.assertTrue("lock is owned", lock.renew()); + } finally { + lock.release(); + } + } + + @Test + public void shouldNotRenewReleasedLock() throws InterruptedException { + final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(10)); + Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); + lock.release(); + Assert.assertFalse("lock is already released", lock.isHeldByCaller()); + Assert.assertFalse("lock is already released", lock.isHeld()); + Assert.assertFalse("lock is already released", lock.renew()); + } + + @Test + public void shouldRenewExpiredLockNotAcquiredByOthers() throws InterruptedException { + final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1)); + Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); + try { + Thread.sleep(lock.expirationMillis() * 2); + Assert.assertFalse("lock is already expired", lock.isHeldByCaller()); + Assert.assertFalse("lock is already expired", lock.isHeld()); + Assert.assertTrue("lock is owned", lock.renew()); + } finally { + lock.release(); + } + } + + @Test + public void shouldNotRenewLockAcquiredByOthers() throws InterruptedException { + final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1)); + Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); + try { + Thread.sleep(lock.expirationMillis() * 2); + Assert.assertFalse("lock is already expired", lock.isHeldByCaller()); + Assert.assertFalse("lock is already expired", lock.isHeld()); + final LeaseLock otherLock = lock(TimeUnit.SECONDS.toMillis(10)); + Assert.assertTrue("lock is already expired", otherLock.tryAcquire()); + try { + Assert.assertFalse("lock is owned by others", lock.renew()); + } finally { + otherLock.release(); + } + } finally { + lock.release(); + } + } +} + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/09a5d6f1/docs/user-manual/en/persistence.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/persistence.md b/docs/user-manual/en/persistence.md index 5264b55..7bd4618 100644 --- a/docs/user-manual/en/persistence.md +++ b/docs/user-manual/en/persistence.md @@ -452,6 +452,21 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a The JDBC network connection timeout in milliseconds. The default value is 20000 milliseconds (ie 20 seconds). + +- `jdbc-lock-acquisition-timeout` + + The max allowed time in milliseconds while trying to acquire a JDBC lock. The default value + is 60000 milliseconds (ie 60 seconds). + +- `jdbc-lock-renew-period` + + The period in milliseconds of the keep alive service of a JDBC lock. The default value + is 2000 milliseconds (ie 2 seconds). + +- `jdbc-lock-expiration` + + The time in milliseconds a JDBC lock is considered valid without keeping it alive. The default value + is 20000 milliseconds (ie 20 seconds). Note that some DBMS (e.g. Oracle, 30 chars) have restrictions on the size of table names, this should be taken into consideration when configuring table names for the Artemis database store, pay particular attention to the page store table name, which can be appended with a unique ID of up to 20 characters. (for Oracle this would mean configuring a page-store-table-name of max size of 10 chars).