Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 54AAA18301 for ; Tue, 1 Mar 2016 19:04:17 +0000 (UTC) Received: (qmail 79844 invoked by uid 500); 1 Mar 2016 19:04:10 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 79537 invoked by uid 500); 1 Mar 2016 19:04:10 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 79117 invoked by uid 99); 1 Mar 2016 19:04:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Mar 2016 19:04:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C4B38E022F; Tue, 1 Mar 2016 19:04:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 01 Mar 2016 19:04:15 -0000 Message-Id: In-Reply-To: <834bb63a82524a86acc741ae74240559@git.apache.org> References: <834bb63a82524a86acc741ae74240559@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/52] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java deleted file mode 100644 index 352d2f0..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java +++ /dev/null @@ -1,249 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.util.Vector; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ConnectionControl; -import org.apache.activemq.store.kahadb.disk.journal.FileAppender; -import org.apache.activemq.store.kahadb.disk.journal.Journal; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KahaDBFastEnqueueTest { - - private static final Logger LOG = LoggerFactory.getLogger(KahaDBFastEnqueueTest.class); - private BrokerService broker; - private ActiveMQConnectionFactory connectionFactory; - KahaDBPersistenceAdapter kahaDBPersistenceAdapter; - private final Destination destination = new ActiveMQQueue("Test"); - private final String payloadString = new String(new byte[6 * 1024]); - private final boolean useBytesMessage = true; - private final int parallelProducer = 20; - private final Vector exceptions = new Vector<>(); - long toSend = 10000; - - // use with: - // -Xmx4g -Dorg.apache.kahadb.journal.appender.WRITE_STAT_WINDOW=10000 -Dorg.apache.kahadb.journal.CALLER_BUFFER_APPENDER=true - @Test - public void testPublishNoConsumer() throws Exception { - - startBroker(true, 10); - - final AtomicLong sharedCount = new AtomicLong(toSend); - long start = System.currentTimeMillis(); - ExecutorService executorService = Executors.newCachedThreadPool(); - for (int i = 0; i < parallelProducer; i++) { - executorService.execute(new Runnable() { - @Override - public void run() { - try { - publishMessages(sharedCount, 0); - } - catch (Exception e) { - exceptions.add(e); - } - } - }); - } - executorService.shutdown(); - executorService.awaitTermination(30, TimeUnit.MINUTES); - assertTrue("Producers done in time", executorService.isTerminated()); - assertTrue("No exceptions: " + exceptions, exceptions.isEmpty()); - long totalSent = toSend * payloadString.length(); - - double duration = System.currentTimeMillis() - start; - stopBroker(); - LOG.info("Duration: " + duration + "ms"); - LOG.info("Rate: " + (toSend * 1000 / duration) + "m/s"); - LOG.info("Total send: " + totalSent); - LOG.info("Total journal write: " + kahaDBPersistenceAdapter.getStore().getJournal().length()); - LOG.info("Total index size " + kahaDBPersistenceAdapter.getStore().getPageFile().getDiskSize()); - LOG.info("Total store size: " + kahaDBPersistenceAdapter.size()); - LOG.info("Journal writes %: " + kahaDBPersistenceAdapter.getStore().getJournal().length() / (double) totalSent * 100 + "%"); - - restartBroker(0, 1200000); - consumeMessages(toSend); - } - - @Test - public void testPublishNoConsumerNoCheckpoint() throws Exception { - - toSend = 100; - startBroker(true, 0); - - final AtomicLong sharedCount = new AtomicLong(toSend); - long start = System.currentTimeMillis(); - ExecutorService executorService = Executors.newCachedThreadPool(); - for (int i = 0; i < parallelProducer; i++) { - executorService.execute(new Runnable() { - @Override - public void run() { - try { - publishMessages(sharedCount, 0); - } - catch (Exception e) { - exceptions.add(e); - } - } - }); - } - executorService.shutdown(); - executorService.awaitTermination(30, TimeUnit.MINUTES); - assertTrue("Producers done in time", executorService.isTerminated()); - assertTrue("No exceptions: " + exceptions, exceptions.isEmpty()); - long totalSent = toSend * payloadString.length(); - - broker.getAdminView().gc(); - - double duration = System.currentTimeMillis() - start; - stopBroker(); - LOG.info("Duration: " + duration + "ms"); - LOG.info("Rate: " + (toSend * 1000 / duration) + "m/s"); - LOG.info("Total send: " + totalSent); - LOG.info("Total journal write: " + kahaDBPersistenceAdapter.getStore().getJournal().length()); - LOG.info("Total index size " + kahaDBPersistenceAdapter.getStore().getPageFile().getDiskSize()); - LOG.info("Total store size: " + kahaDBPersistenceAdapter.size()); - LOG.info("Journal writes %: " + kahaDBPersistenceAdapter.getStore().getJournal().length() / (double) totalSent * 100 + "%"); - - restartBroker(0, 0); - consumeMessages(toSend); - } - - private void consumeMessages(long count) throws Exception { - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.setWatchTopicAdvisories(false); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(destination); - for (int i = 0; i < count; i++) { - assertNotNull("got message " + i, consumer.receive(10000)); - } - assertNull("none left over", consumer.receive(2000)); - } - - private void restartBroker(int restartDelay, int checkpoint) throws Exception { - stopBroker(); - TimeUnit.MILLISECONDS.sleep(restartDelay); - startBroker(false, checkpoint); - } - - @Before - public void setProps() { - System.setProperty(Journal.CALLER_BUFFER_APPENDER, Boolean.toString(true)); - System.setProperty(FileAppender.PROPERTY_LOG_WRITE_STAT_WINDOW, "10000"); - } - - @After - public void stopBroker() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - System.clearProperty(Journal.CALLER_BUFFER_APPENDER); - System.clearProperty(FileAppender.PROPERTY_LOG_WRITE_STAT_WINDOW); - } - - final double sampleRate = 100000; - - private void publishMessages(AtomicLong count, int expiry) throws Exception { - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.setWatchTopicAdvisories(false); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer producer = session.createProducer(destination); - Long start = System.currentTimeMillis(); - long i = 0L; - while ((i = count.getAndDecrement()) > 0) { - Message message = null; - if (useBytesMessage) { - message = session.createBytesMessage(); - ((BytesMessage) message).writeBytes(payloadString.getBytes()); - } - else { - message = session.createTextMessage(payloadString); - } - producer.send(message, DeliveryMode.PERSISTENT, 5, expiry); - if (i != toSend && i % sampleRate == 0) { - long now = System.currentTimeMillis(); - LOG.info("Remainder: " + i + ", rate: " + sampleRate * 1000 / (now - start) + "m/s"); - start = now; - } - } - connection.syncSendPacket(new ConnectionControl()); - connection.close(); - } - - public void startBroker(boolean deleteAllMessages, int checkPointPeriod) throws Exception { - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(deleteAllMessages); - kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); - kahaDBPersistenceAdapter.setEnableJournalDiskSyncs(false); - // defer checkpoints which require a sync - kahaDBPersistenceAdapter.setCleanupInterval(checkPointPeriod); - kahaDBPersistenceAdapter.setCheckpointInterval(checkPointPeriod); - - // optimise for disk best batch rate - kahaDBPersistenceAdapter.setJournalMaxWriteBatchSize(24 * 1024 * 1024); //4mb default - kahaDBPersistenceAdapter.setJournalMaxFileLength(128 * 1024 * 1024); // 32mb default - // keep index in memory - kahaDBPersistenceAdapter.setIndexCacheSize(500000); - kahaDBPersistenceAdapter.setIndexWriteBatchSize(500000); - kahaDBPersistenceAdapter.setEnableIndexRecoveryFile(false); - kahaDBPersistenceAdapter.setEnableIndexDiskSyncs(false); - - broker.addConnector("tcp://0.0.0.0:0"); - broker.start(); - - String options = "?jms.watchTopicAdvisories=false&jms.useAsyncSend=true&jms.alwaysSessionAsync=false&jms.dispatchAsync=false&socketBufferSize=131072&ioBufferSize=16384&wireFormat.tightEncodingEnabled=false&wireFormat.cacheSize=8192"; - connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri() + options); - } - - @Test - public void testRollover() throws Exception { - byte flip = 0x1; - for (long i = 0; i < Short.MAX_VALUE; i++) { - assertEquals("0 @:" + i, 0, flip ^= (byte) 1); - assertEquals("1 @:" + i, 1, flip ^= (byte) 1); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java deleted file mode 100644 index 24229c9..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.FilenameFilter; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - */ -public class KahaDBIndexLocationTest { - - private static final Logger LOG = LoggerFactory.getLogger(KahaDBIndexLocationTest.class); - - @Rule - public TestName name = new TestName(); - - private BrokerService broker; - - private final File testDataDir = new File("target/activemq-data/QueuePurgeTest"); - private final File kahaDataDir = new File(testDataDir, "kahadb"); - private final File kahaIndexDir = new File(testDataDir, "kahadb/index"); - - /** - * @throws java.lang.Exception - */ - @Before - public void setUp() throws Exception { - startBroker(); - } - - @After - public void tearDown() throws Exception { - stopBroker(); - } - - private void startBroker() throws Exception { - createBroker(); - broker.start(); - broker.waitUntilStarted(); - } - - private void stopBroker() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - - private void restartBroker() throws Exception { - stopBroker(); - createBroker(); - broker.start(); - broker.waitUntilStarted(); - } - - private void createBroker() throws Exception { - broker = new BrokerService(); - - KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); - persistenceAdapter.setDirectory(kahaDataDir); - persistenceAdapter.setIndexDirectory(kahaIndexDir); - - broker.setDataDirectoryFile(testDataDir); - broker.setUseJmx(false); - broker.setAdvisorySupport(false); - broker.setSchedulerSupport(false); - broker.setDeleteAllMessagesOnStartup(true); - broker.setPersistenceAdapter(persistenceAdapter); - } - - @Test - public void testIndexDirExists() throws Exception { - LOG.info("Index dir is configured as: {}", kahaIndexDir); - assertTrue(kahaDataDir.exists()); - assertTrue(kahaIndexDir.exists()); - - String[] index = kahaIndexDir.list(new FilenameFilter() { - - @Override - public boolean accept(File dir, String name) { - LOG.info("Testing filename: {}", name); - return name.endsWith("data") || name.endsWith("redo"); - } - }); - - String[] journal = kahaDataDir.list(new FilenameFilter() { - - @Override - public boolean accept(File dir, String name) { - LOG.info("Testing filename: {}", name); - return name.endsWith("log") || name.equals("lock"); - } - }); - - produceMessages(); - - // Should be db.data and db.redo and nothing else. - assertNotNull(index); - assertEquals(2, index.length); - - // Should contain the initial log for the journal and the lock. - assertNotNull(journal); - assertEquals(2, journal.length); - } - - @Test - public void testRestartWithDeleteWorksWhenIndexIsSeparate() throws Exception { - produceMessages(); - restartBroker(); - - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false"); - Connection connection = cf.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(name.getMethodName()); - MessageConsumer consumer = session.createConsumer(queue); - assertNull(consumer.receive(2000)); - } - - private void produceMessages() throws Exception { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false"); - Connection connection = cf.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(name.getMethodName()); - MessageProducer producer = session.createProducer(queue); - for (int i = 0; i < 5; ++i) { - producer.send(session.createTextMessage("test:" + i)); - } - connection.close(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java deleted file mode 100644 index bb0e954..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.store.kahadb; - -import junit.framework.Test; - -import org.apache.activemq.store.MessagePriorityTest; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; - -public class KahaDBMessagePriorityTest extends MessagePriorityTest { - - @Override - protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception { - KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); - adapter.setConcurrentStoreAndDispatchQueues(false); - adapter.setConcurrentStoreAndDispatchTopics(false); - adapter.deleteAllMessages(); - return adapter; - } - - public static Test suite() { - return suite(KahaDBMessagePriorityTest.class); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java deleted file mode 100644 index cddbd71..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb; - -import java.io.File; -import java.io.IOException; - -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.PersistenceAdapterTestSupport; - -/** - * @author Hiram Chirino - */ -public class KahaDBPersistenceAdapterTest extends PersistenceAdapterTestSupport { - - @Override - protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException { - KahaDBStore kaha = new KahaDBStore(); - kaha.setDirectory(new File("target/activemq-data/kahadb")); - if (delete) { - kaha.deleteAllMessages(); - } - return kaha; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java deleted file mode 100644 index b8fef90..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb; - -import java.io.File; - -import junit.framework.Test; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.BrokerTest; -import org.apache.activemq.util.IOHelper; - -/** - * Once the wire format is completed we can test against real persistence storage. - */ -public class KahaDBStoreBrokerTest extends BrokerTest { - - @Override - protected void setUp() throws Exception { - this.setAutoFail(true); - super.setUp(); - } - - @Override - protected BrokerService createBroker() throws Exception { - BrokerService broker = new BrokerService(); - KahaDBStore kaha = new KahaDBStore(); - File directory = new File("target/activemq-data/kahadb"); - IOHelper.deleteChildren(directory); - kaha.setDirectory(directory); - kaha.deleteAllMessages(); - broker.setPersistenceAdapter(kaha); - return broker; - } - - protected BrokerService createRestartedBroker() throws Exception { - BrokerService broker = new BrokerService(); - KahaDBStore kaha = new KahaDBStore(); - kaha.setDirectory(new File("target/activemq-data/kahadb")); - broker.setPersistenceAdapter(kaha); - return broker; - } - - public static Test suite() { - return suite(KahaDBStoreBrokerTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java deleted file mode 100644 index e672890..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb; - -import java.io.File; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.StoreOrderTest; - -// https://issues.apache.org/activemq/browse/AMQ-2594 -public class KahaDBStoreOrderTest extends StoreOrderTest { - - @Override - protected void setPersistentAdapter(BrokerService brokerService) throws Exception { - KahaDBStore kaha = new KahaDBStore(); - File directory = new File("target/activemq-data/kahadb/storeOrder"); - kaha.setDirectory(directory); - brokerService.setPersistenceAdapter(kaha); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java deleted file mode 100644 index bddfde8..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.util.ArrayList; - -import junit.framework.Test; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.RecoveryBrokerTest; -import org.apache.activemq.broker.StubConnection; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.SessionInfo; -import org.apache.commons.io.FileUtils; - -/** - * Used to verify that recovery works correctly against - */ -public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest { - - public static final String KAHADB_DIR_BASE = "target/activemq-data/kahadb"; - public static String kahaDbDirectoryName; - - enum CorruptionType {None, FailToLoad, LoadInvalid, LoadCorrupt, LoadOrderIndex0} - - public CorruptionType failTest = CorruptionType.None; - - @Override - protected void setUp() throws Exception { - kahaDbDirectoryName = KAHADB_DIR_BASE + "/" + System.currentTimeMillis(); - super.setUp(); - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - try { - File kahaDbDir = new File(kahaDbDirectoryName); - FileUtils.deleteDirectory(kahaDbDir); - } - catch (IOException e) { - } - } - - @Override - protected BrokerService createBroker() throws Exception { - BrokerService broker = new BrokerService(); - KahaDBStore kaha = new KahaDBStore(); - kaha.setDirectory(new File(kahaDbDirectoryName)); - kaha.deleteAllMessages(); - kaha.setCheckForCorruptJournalFiles(failTest == CorruptionType.LoadOrderIndex0); - broker.setPersistenceAdapter(kaha); - return broker; - } - - @Override - @SuppressWarnings("resource") - protected BrokerService createRestartedBroker() throws Exception { - - // corrupting index - File index = new File(kahaDbDirectoryName + "/db.data"); - RandomAccessFile raf = new RandomAccessFile(index, "rw"); - switch (failTest) { - case FailToLoad: - index.delete(); - raf = new RandomAccessFile(index, "rw"); - raf.seek(index.length()); - raf.writeBytes("corrupt"); - break; - case LoadInvalid: - // page size 0 - raf.seek(0); - raf.writeBytes("corrupt and cannot load metadata"); - break; - case LoadCorrupt: - // loadable but invalid metadata - // location of order index low priority index for first destination... - raf.seek(8 * 1024 + 57); - raf.writeLong(Integer.MAX_VALUE - 10); - break; - case LoadOrderIndex0: - // loadable but invalid metadata - // location of order index default priority index size - // so looks like there are no ids in the order index - // picked up by setCheckForCorruptJournalFiles - raf.seek(12 * 1024 + 21); - raf.writeShort(0); - raf.writeChar(0); - raf.writeLong(-1); - break; - default: - } - raf.close(); - - // starting broker - BrokerService broker = new BrokerService(); - KahaDBStore kaha = new KahaDBStore(); - kaha.setCheckForCorruptJournalFiles(failTest == CorruptionType.LoadOrderIndex0); - // uncomment if you want to test archiving - //kaha.setArchiveCorruptedIndex(true); - kaha.setDirectory(new File(kahaDbDirectoryName)); - broker.setPersistenceAdapter(kaha); - return broker; - } - - public static Test suite() { - return suite(KahaDBStoreRecoveryBrokerTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - - public void initCombosForTestLargeQueuePersistentMessagesNotLostOnRestart() { - this.addCombinationValues("failTest", new CorruptionType[]{CorruptionType.FailToLoad, CorruptionType.LoadInvalid, CorruptionType.LoadCorrupt, CorruptionType.LoadOrderIndex0}); - } - - public void testLargeQueuePersistentMessagesNotLostOnRestart() throws Exception { - - ActiveMQDestination destination = new ActiveMQQueue("TEST"); - - // Setup the producer and send the message. - StubConnection connection = createConnection(); - ConnectionInfo connectionInfo = createConnectionInfo(); - SessionInfo sessionInfo = createSessionInfo(connectionInfo); - ProducerInfo producerInfo = createProducerInfo(sessionInfo); - connection.send(connectionInfo); - connection.send(sessionInfo); - connection.send(producerInfo); - - ArrayList expected = new ArrayList<>(); - - int MESSAGE_COUNT = 10000; - for (int i = 0; i < MESSAGE_COUNT; i++) { - Message message = createMessage(producerInfo, destination); - message.setPersistent(true); - connection.send(message); - expected.add(message.getMessageId().toString()); - } - connection.request(closeConnectionInfo(connectionInfo)); - - // restart the broker. - restartBroker(); - - // Setup the consumer and receive the message. - connection = createConnection(); - connectionInfo = createConnectionInfo(); - sessionInfo = createSessionInfo(connectionInfo); - connection.send(connectionInfo); - connection.send(sessionInfo); - ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); - connection.send(consumerInfo); - producerInfo = createProducerInfo(sessionInfo); - connection.send(producerInfo); - - for (int i = 0; i < MESSAGE_COUNT / 2; i++) { - Message m = receiveMessage(connection); - assertNotNull("Should have received message " + expected.get(0) + " by now!", m); - assertEquals(expected.remove(0), m.getMessageId().toString()); - MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE); - connection.send(ack); - } - - connection.request(closeConnectionInfo(connectionInfo)); - - // restart the broker. - restartBroker(); - - // Setup the consumer and receive the message. - connection = createConnection(); - connectionInfo = createConnectionInfo(); - sessionInfo = createSessionInfo(connectionInfo); - connection.send(connectionInfo); - connection.send(sessionInfo); - consumerInfo = createConsumerInfo(sessionInfo, destination); - connection.send(consumerInfo); - - for (int i = 0; i < MESSAGE_COUNT / 2; i++) { - Message m = receiveMessage(connection); - assertNotNull("Should have received message " + expected.get(i) + " by now!", m); - assertEquals(expected.get(i), m.getMessageId().toString()); - MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE); - connection.send(ack); - - } - - connection.request(closeConnectionInfo(connectionInfo)); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryExpiryTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryExpiryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryExpiryTest.java deleted file mode 100644 index 6ed4000..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryExpiryTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - -import java.util.concurrent.TimeUnit; - -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.BaseDestination; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; -import org.apache.activemq.command.ActiveMQQueue; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class KahaDBStoreRecoveryExpiryTest { - - private BrokerService broker; - private ActiveMQConnection connection; - private final Destination destination = new ActiveMQQueue("Test"); - private Session session; - - @Test - public void testRestartWitExpired() throws Exception { - publishMessages(1, 0); - publishMessages(1, 2000); - publishMessages(1, 0); - restartBroker(3000); - consumeMessages(2); - } - - @Test - public void testRestartWitExpiredLargerThanBatchRecovery() throws Exception { - publishMessages(BaseDestination.MAX_PAGE_SIZE + 10, 2000); - publishMessages(10, 0); - restartBroker(3000); - consumeMessages(10); - } - - private void consumeMessages(int count) throws Exception { - MessageConsumer consumer = session.createConsumer(destination); - for (int i = 0; i < count; i++) { - assertNotNull("got message " + i, consumer.receive(4000)); - } - assertNull("none left over", consumer.receive(2000)); - } - - private void restartBroker(int restartDelay) throws Exception { - stopBroker(); - TimeUnit.MILLISECONDS.sleep(restartDelay); - startBroker(); - } - - @After - public void stopBroker() throws Exception { - broker.stop(); - broker.waitUntilStopped(); - } - - private void publishMessages(int count, int expiry) throws Exception { - MessageProducer producer = session.createProducer(destination); - for (int i = 0; i < count; i++) { - producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 5, expiry); - } - } - - @Before - public void startBroker() throws Exception { - broker = new BrokerService(); - ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).setIndexCacheSize(0); - PolicyMap policyMap = new PolicyMap(); - PolicyEntry defaultEntry = new PolicyEntry(); - defaultEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); - policyMap.setDefaultEntry(defaultEntry); - broker.setDestinationPolicy(policyMap); - broker.setUseJmx(false); - broker.start(); - - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false"); - connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.setWatchTopicAdvisories(false); - connection.start(); - - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreTest.java deleted file mode 100644 index 1b9980f..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb; - -import java.util.Vector; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.ProducerId; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertTrue; - -public class KahaDBStoreTest { - - KahaDBStore.KahaDBMessageStore underTest; - KahaDBStore store; - ActiveMQMessage message; - ProducerId producerId = new ProducerId("1.1.1"); - private static final int MESSAGE_COUNT = 2000; - private Vector exceptions = new Vector<>(); - - @Before - public void initStore() throws Exception { - ActiveMQDestination destination = new ActiveMQQueue("Test"); - store = new KahaDBStore(); - store.setMaxAsyncJobs(100); - store.setDeleteAllMessages(true); - store.start(); - underTest = store.new KahaDBMessageStore(destination); - underTest.start(); - message = new ActiveMQMessage(); - message.setDestination(destination); - } - - @After - public void destroyStore() throws Exception { - if (store != null) { - store.stop(); - } - } - - @Test - public void testConcurrentStoreAndDispatchQueue() throws Exception { - - ExecutorService executor = Executors.newCachedThreadPool(); - for (int i = 0; i < MESSAGE_COUNT; i++) { - final int id = ++i; - executor.execute(new Runnable() { - @Override - public void run() { - try { - Message msg = message.copy(); - msg.setMessageId(new MessageId(producerId, id)); - underTest.asyncAddQueueMessage(null, msg); - } - catch (Exception e) { - exceptions.add(e); - } - } - }); - } - - ExecutorService executor2 = Executors.newCachedThreadPool(); - for (int i = 0; i < MESSAGE_COUNT; i++) { - final int id = ++i; - executor2.execute(new Runnable() { - @Override - public void run() { - try { - MessageAck ack = new MessageAck(); - ack.setLastMessageId(new MessageId(producerId, id)); - underTest.removeAsyncMessage(null, ack); - } - catch (Exception e) { - exceptions.add(e); - } - } - }); - } - - executor.shutdown(); - executor.awaitTermination(60, TimeUnit.SECONDS); - - executor2.shutdown(); - executor2.awaitTermination(60, TimeUnit.SECONDS); - - assertTrue("no exceptions " + exceptions, exceptions.isEmpty()); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java deleted file mode 100644 index 3b63758..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java +++ /dev/null @@ -1,241 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; - -/** - * @author chirino - */ -public class KahaDBTest extends TestCase { - - protected BrokerService createBroker(KahaDBStore kaha) throws Exception { - - BrokerService broker = new BrokerService(); - broker.setUseJmx(false); - broker.setPersistenceAdapter(kaha); - broker.start(); - return broker; - } - - private KahaDBStore createStore(boolean delete) throws IOException { - KahaDBStore kaha = new KahaDBStore(); - kaha.setDirectory(new File("target/activemq-data/kahadb")); - if (delete) { - kaha.deleteAllMessages(); - } - return kaha; - } - - public void testIgnoreMissingJournalfilesOptionSetFalse() throws Exception { - KahaDBStore kaha = createStore(true); - kaha.setJournalMaxFileLength(1024 * 100); - assertFalse(kaha.isIgnoreMissingJournalfiles()); - BrokerService broker = createBroker(kaha); - sendMessages(1000); - broker.stop(); - - // Delete some journal files.. - assertExistsAndDelete(new File(kaha.getDirectory(), "db-4.log")); - assertExistsAndDelete(new File(kaha.getDirectory(), "db-8.log")); - - kaha = createStore(false); - kaha.setJournalMaxFileLength(1024 * 100); - assertFalse(kaha.isIgnoreMissingJournalfiles()); - try { - broker = createBroker(kaha); - fail("expected IOException"); - } - catch (IOException e) { - assertTrue(e.getMessage().startsWith("Detected missing/corrupt journal files")); - } - - } - - public void testIgnoreMissingJournalfilesOptionSetTrue() throws Exception { - KahaDBStore kaha = createStore(true); - kaha.setJournalMaxFileLength(1024 * 100); - assertFalse(kaha.isIgnoreMissingJournalfiles()); - BrokerService broker = createBroker(kaha); - sendMessages(1000); - broker.stop(); - - // Delete some journal files.. - assertExistsAndDelete(new File(kaha.getDirectory(), "db-4.log")); - assertExistsAndDelete(new File(kaha.getDirectory(), "db-8.log")); - - kaha = createStore(false); - kaha.setIgnoreMissingJournalfiles(true); - kaha.setJournalMaxFileLength(1024 * 100); - broker = createBroker(kaha); - - // We know we won't get all the messages but we should get most of them. - int count = receiveMessages(); - assertTrue(count > 800); - assertTrue(count < 1000); - - broker.stop(); - } - - public void testCheckCorruptionNotIgnored() throws Exception { - KahaDBStore kaha = createStore(true); - assertTrue(kaha.isChecksumJournalFiles()); - assertFalse(kaha.isCheckForCorruptJournalFiles()); - - kaha.setJournalMaxFileLength(1024 * 100); - kaha.setChecksumJournalFiles(true); - BrokerService broker = createBroker(kaha); - sendMessages(1000); - broker.stop(); - - // Modify/Corrupt some journal files.. - assertExistsAndCorrupt(new File(kaha.getDirectory(), "db-4.log")); - assertExistsAndCorrupt(new File(kaha.getDirectory(), "db-8.log")); - - kaha = createStore(false); - kaha.setJournalMaxFileLength(1024 * 100); - kaha.setChecksumJournalFiles(true); - kaha.setCheckForCorruptJournalFiles(true); - assertFalse(kaha.isIgnoreMissingJournalfiles()); - try { - broker = createBroker(kaha); - fail("expected IOException"); - } - catch (IOException e) { - assertTrue(e.getMessage().startsWith("Detected missing/corrupt journal files")); - } - - } - - public void testMigrationOnNewDefaultForChecksumJournalFiles() throws Exception { - KahaDBStore kaha = createStore(true); - kaha.setChecksumJournalFiles(false); - assertFalse(kaha.isChecksumJournalFiles()); - assertFalse(kaha.isCheckForCorruptJournalFiles()); - - kaha.setJournalMaxFileLength(1024 * 100); - BrokerService broker = createBroker(kaha); - sendMessages(1000); - broker.stop(); - - kaha = createStore(false); - kaha.setJournalMaxFileLength(1024 * 100); - kaha.setCheckForCorruptJournalFiles(true); - assertFalse(kaha.isIgnoreMissingJournalfiles()); - createBroker(kaha); - assertEquals(1000, receiveMessages()); - } - - private void assertExistsAndCorrupt(File file) throws IOException { - assertTrue(file.exists()); - RandomAccessFile f = new RandomAccessFile(file, "rw"); - try { - f.seek(1024 * 5 + 134); - f.write("... corruption string ...".getBytes()); - } - finally { - f.close(); - } - } - - public void testCheckCorruptionIgnored() throws Exception { - KahaDBStore kaha = createStore(true); - kaha.setJournalMaxFileLength(1024 * 100); - BrokerService broker = createBroker(kaha); - sendMessages(1000); - broker.stop(); - - // Delete some journal files.. - assertExistsAndCorrupt(new File(kaha.getDirectory(), "db-4.log")); - assertExistsAndCorrupt(new File(kaha.getDirectory(), "db-8.log")); - - kaha = createStore(false); - kaha.setIgnoreMissingJournalfiles(true); - kaha.setJournalMaxFileLength(1024 * 100); - kaha.setCheckForCorruptJournalFiles(true); - broker = createBroker(kaha); - - // We know we won't get all the messages but we should get most of them. - int count = receiveMessages(); - assertTrue("Expected to received a min # of messages.. Got: " + count, count > 990); - assertTrue(count < 1000); - - broker.stop(); - } - - private void assertExistsAndDelete(File file) { - assertTrue(file.exists()); - file.delete(); - assertFalse(file.exists()); - } - - private void sendMessages(int count) throws JMSException { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = cf.createConnection(); - try { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST")); - for (int i = 0; i < count; i++) { - producer.send(session.createTextMessage(createContent(i))); - } - } - finally { - connection.close(); - } - } - - private int receiveMessages() throws JMSException { - int rc = 0; - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = cf.createConnection(); - try { - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer messageConsumer = session.createConsumer(new ActiveMQQueue("TEST")); - while (messageConsumer.receive(1000) != null) { - rc++; - } - return rc; - } - finally { - connection.close(); - } - } - - private String createContent(int i) { - StringBuilder sb = new StringBuilder(i + ":"); - while (sb.length() < 1024) { - sb.append("*"); - } - return sb.toString(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db-1.log ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db-1.log b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db-1.log deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db.data ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db.data b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db.data deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db.redo ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db.redo b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db.redo deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db-1.log ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db-1.log b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db-1.log deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db.data ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db.data b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db.data deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db.redo ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db.redo b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db.redo deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db-1.log ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db-1.log b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db-1.log deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db.data ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db.data b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db.data deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db.redo ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db.redo b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db.redo deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db-1.log ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db-1.log b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db-1.log deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db.data ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db.data b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db.data deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db.redo ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db.redo b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db.redo deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java deleted file mode 100644 index e1b42ad..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb; - -import java.io.File; -import java.io.IOException; -import java.security.ProtectionDomain; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.util.IOHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author chirino - */ -public class KahaDBVersionTest extends TestCase { - - static String basedir; - - static { - try { - ProtectionDomain protectionDomain = KahaDBVersionTest.class.getProtectionDomain(); - basedir = new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalPath(); - } - catch (IOException e) { - basedir = "."; - } - } - - static final Logger LOG = LoggerFactory.getLogger(KahaDBVersionTest.class); - final static File VERSION_1_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1"); - final static File VERSION_2_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2"); - final static File VERSION_3_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3"); - final static File VERSION_4_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4"); - - BrokerService broker = null; - - protected BrokerService createBroker(KahaDBPersistenceAdapter kaha) throws Exception { - broker = new BrokerService(); - broker.setUseJmx(false); - broker.setPersistenceAdapter(kaha); - broker.start(); - return broker; - } - - @Override - protected void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - } - } - - public void XtestCreateStore() throws Exception { - KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); - File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4"); - IOHelper.deleteFile(dir); - kaha.setDirectory(dir); - kaha.setJournalMaxFileLength(1024 * 1024); - BrokerService broker = createBroker(kaha); - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = cf.createConnection(); - connection.setClientID("test"); - connection.start(); - producerSomeMessages(connection, 1000); - connection.close(); - broker.stop(); - } - - private void producerSomeMessages(Connection connection, int numToSend) throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic("test.topic"); - Queue queue = session.createQueue("test.queue"); - MessageConsumer consumer = session.createDurableSubscriber(topic, "test"); - consumer.close(); - MessageProducer producer = session.createProducer(topic); - producer.setPriority(9); - for (int i = 0; i < numToSend; i++) { - Message msg = session.createTextMessage("test message:" + i); - producer.send(msg); - } - LOG.info("sent " + numToSend + " to topic"); - producer = session.createProducer(queue); - for (int i = 0; i < numToSend; i++) { - Message msg = session.createTextMessage("test message:" + i); - producer.send(msg); - } - LOG.info("sent " + numToSend + " to queue"); - } - - public void testVersion1Conversion() throws Exception { - doConvertRestartCycle(VERSION_1_DB); - } - - public void testVersion2Conversion() throws Exception { - doConvertRestartCycle(VERSION_2_DB); - } - - public void testVersion3Conversion() throws Exception { - doConvertRestartCycle(VERSION_3_DB); - } - - public void testVersion4Conversion() throws Exception { - doConvertRestartCycle(VERSION_4_DB); - } - - public void doConvertRestartCycle(File existingStore) throws Exception { - - File testDir = new File("target/activemq-data/kahadb/versionDB"); - IOHelper.deleteFile(testDir); - IOHelper.copyFile(existingStore, testDir); - final int numToSend = 1000; - - // on repeat store will be upgraded - for (int repeats = 0; repeats < 3; repeats++) { - KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); - kaha.setDirectory(testDir); - kaha.setJournalMaxFileLength(1024 * 1024); - BrokerService broker = createBroker(kaha); - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = cf.createConnection(); - connection.setClientID("test"); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic("test.topic"); - Queue queue = session.createQueue("test.queue"); - - if (repeats > 0) { - // upgraded store will be empty so generated some more messages - producerSomeMessages(connection, numToSend); - } - - MessageConsumer queueConsumer = session.createConsumer(queue); - int count = 0; - for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) { - TextMessage msg = (TextMessage) queueConsumer.receive(10000); - count++; - // System.err.println(msg.getText()); - assertNotNull(msg); - } - LOG.info("Consumed " + count + " from queue"); - count = 0; - MessageConsumer topicConsumer = session.createDurableSubscriber(topic, "test"); - for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) { - TextMessage msg = (TextMessage) topicConsumer.receive(10000); - count++; - // System.err.println(msg.getText()); - assertNotNull("" + count, msg); - } - LOG.info("Consumed " + count + " from topic"); - connection.close(); - - broker.stop(); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/NoSpaceIOTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/NoSpaceIOTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/NoSpaceIOTest.java deleted file mode 100644 index 30e79c9..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/NoSpaceIOTest.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb; - -import java.io.File; -import java.io.RandomAccessFile; -import java.util.concurrent.atomic.AtomicLong; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.junit.Ignore; -import org.junit.Test; - -public class NoSpaceIOTest { - - private static final Logger LOG = LoggerFactory.getLogger(NoSpaceIOTest.class); - - // need an app to input to console in intellij idea - public static void main(String[] args) throws Exception { - new NoSpaceIOTest().testRunOutOfSpace(); - } - - // handy way to validate some out of space related errors with a usb key - // allow it to run out of space, delete toDelete and see it recover - @Ignore("needs small volume, like usb key") - @Test - public void testRunOutOfSpace() throws Exception { - BrokerService broker = new BrokerService(); - File dataDir = new File("/Volumes/NO NAME/"); - File useUpSpace = new File(dataDir, "bigFile"); - if (!useUpSpace.exists()) { - LOG.info("using up some space..."); - RandomAccessFile filler = new RandomAccessFile(useUpSpace, "rw"); - filler.setLength(1024 * 1024 * 1212); // use ~1.xG of 2G (usb) volume - filler.close(); - File toDelete = new File(dataDir, "toDelete"); - filler = new RandomAccessFile(toDelete, "rw"); - filler.setLength(1024 * 1024 * 32 * 10); // 10 data files - filler.close(); - } - broker.setDataDirectoryFile(dataDir); - broker.start(); - AtomicLong consumed = new AtomicLong(0); - consume(consumed); - LOG.info("consumed: " + consumed); - - broker.getPersistenceAdapter().checkpoint(true); - - AtomicLong sent = new AtomicLong(0); - try { - produce(sent, 200); - } - catch (Exception expected) { - LOG.info("got ex, sent: " + sent); - } - LOG.info("sent: " + sent); - System.out.println("Remove toDelete file and press any key to continue"); - int read = System.in.read(); - System.err.println("read:" + read); - - LOG.info("Trying to send again: " + sent); - try { - produce(sent, 200); - } - catch (Exception expected) { - LOG.info("got ex, sent: " + sent); - } - LOG.info("sent: " + sent); - } - - private void consume(AtomicLong consumed) throws JMSException { - Connection c = new ActiveMQConnectionFactory("vm://localhost").createConnection(); - try { - c.start(); - Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = s.createConsumer(new ActiveMQQueue("t")); - while (consumer.receive(2000) != null) { - consumed.incrementAndGet(); - } - } - finally { - c.close(); - } - } - - private void produce(AtomicLong sent, long toSend) throws JMSException { - Connection c = new ActiveMQConnectionFactory("vm://localhost").createConnection(); - try { - c.start(); - Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = s.createProducer(new ActiveMQQueue("t")); - TextMessage m = s.createTextMessage(); - m.setText(String.valueOf(new char[1024 * 1024])); - for (int i = 0; i < toSend; i++) { - producer.send(m); - sent.incrementAndGet(); - } - } - finally { - c.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java deleted file mode 100644 index f225dee..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb; - -import java.io.IOException; - -import junit.framework.TestCase; - -import org.apache.activemq.protobuf.Buffer; -import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; -import org.apache.activemq.store.kahadb.data.KahaDestination; -import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; -import org.apache.activemq.store.kahadb.data.KahaEntryType; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.DataByteArrayInputStream; -import org.apache.activemq.util.DataByteArrayOutputStream; - -public class PBMesssagesTest extends TestCase { - - @SuppressWarnings("rawtypes") - public void testKahaAddMessageCommand() throws IOException { - - KahaAddMessageCommand expected = new KahaAddMessageCommand(); - expected.setDestination(new KahaDestination().setName("Foo").setType(DestinationType.QUEUE)); - expected.setMessage(new Buffer(new byte[]{1, 2, 3, 4, 5, 6})); - expected.setMessageId("Hello World"); - - int size = expected.serializedSizeFramed(); - DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); - os.writeByte(expected.type().getNumber()); - expected.writeFramed(os); - ByteSequence seq = os.toByteSequence(); - - DataByteArrayInputStream is = new DataByteArrayInputStream(seq); - KahaEntryType type = KahaEntryType.valueOf(is.readByte()); - JournalCommand message = (JournalCommand) type.createMessage(); - message.mergeFramed(is); - - assertEquals(expected, message); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/TempKahaDBStoreBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/TempKahaDBStoreBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/TempKahaDBStoreBrokerTest.java deleted file mode 100644 index 4316fc7..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/TempKahaDBStoreBrokerTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb; - -import java.io.File; - -import junit.framework.Test; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.BrokerTest; - -/** - * Once the wire format is completed we can test against real persistence storage. - */ -public class TempKahaDBStoreBrokerTest extends BrokerTest { - - @Override - protected BrokerService createBroker() throws Exception { - BrokerService broker = new BrokerService(); - KahaDBStore kaha = new KahaDBStore(); - kaha.setDirectory(new File("target/activemq-data/kahadb")); - kaha.deleteAllMessages(); - broker.setPersistenceAdapter(kaha); - return broker; - } - - protected BrokerService createRestartedBroker() throws Exception { - BrokerService broker = new BrokerService(); - TempKahaDBStore kaha = new TempKahaDBStore(); - kaha.setDirectory(new File("target/activemq-data/kahadb")); - broker.setPersistenceAdapter(kaha); - return broker; - } - - public static Test suite() { - return suite(TempKahaDBStoreBrokerTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java deleted file mode 100644 index 1261959..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb.perf; - -import java.io.File; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.BytesMessage; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.Test; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.JmsTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.store.kahadb.KahaDBStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This tests bulk loading and unloading of messages to a Queue.s - */ -public class KahaBulkLoadingTest extends JmsTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(KahaBulkLoadingTest.class); - - protected int messageSize = 1024 * 4; - - @Override - protected BrokerService createBroker() throws Exception { - BrokerService broker = new BrokerService(); - KahaDBStore kaha = new KahaDBStore(); - kaha.setDirectory(new File("target/activemq-data/kahadb")); - // kaha.deleteAllMessages(); - broker.setPersistenceAdapter(kaha); - broker.addConnector("tcp://localhost:0"); - return broker; - } - - @Override - protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getServer().getConnectURI()); - factory.setUseAsyncSend(true); - return factory; - } - - public void testQueueSendThenAddConsumer() throws Exception { - long start; - long end; - ActiveMQDestination destination = new ActiveMQQueue("TEST"); - - connection.setUseCompression(false); - connection.getPrefetchPolicy().setAll(10); - connection.start(); - - Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); - - LOG.info("Receiving messages that are in the queue"); - MessageConsumer consumer = session.createConsumer(destination); - BytesMessage msg = (BytesMessage) consumer.receive(2000); - int consumed = 0; - if (msg != null) { - consumed++; - } - while (true) { - int counter = 0; - if (msg == null) { - break; - } - end = start = System.currentTimeMillis(); - int size = 0; - while ((end - start) < 5000) { - msg = (BytesMessage) consumer.receive(5000); - if (msg == null) { - break; - } - counter++; - consumed++; - end = System.currentTimeMillis(); - size += msg.getBodyLength(); - } - LOG.info("Consumed: " + (counter * 1000.0 / (end - start)) + " " + " messages/sec, " + (1.0 * size / (1024.0 * 1024.0)) * ((1000.0 / (end - start))) + " megs/sec "); - } - consumer.close(); - LOG.info("Consumed " + consumed + " messages from the queue."); - - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - LOG.info("Sending messages that are " + (messageSize / 1024.0) + "k large"); - // Send a message to the broker. - start = System.currentTimeMillis(); - - final AtomicBoolean stop = new AtomicBoolean(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - stop.set(true); - } - }); - - int produced = 0; - while (!stop.get()) { - end = start = System.currentTimeMillis(); - int produceCount = 0; - while ((end - start) < 5000 && !stop.get()) { - BytesMessage bm = session.createBytesMessage(); - bm.writeBytes(new byte[messageSize]); - producer.send(bm); - produceCount++; - produced++; - end = System.currentTimeMillis(); - } - LOG.info("Produced: " + (produceCount * 1000.0 / (end - start)) + " messages/sec, " + (1.0 * produceCount * messageSize / (1024.0 * 1024.0)) * ((1000.0 / (end - start))) + " megs/sec"); - } - LOG.info("Prodcued " + produced + " messages to the queue."); - - } - - public static Test suite() { - return suite(KahaBulkLoadingTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java deleted file mode 100644 index 5d52adb..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb.perf; - -import java.io.File; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.perf.SimpleDurableTopicTest; -import org.apache.activemq.store.kahadb.KahaDBStore; - -/** - * - */ -public class KahaStoreDurableTopicTest extends SimpleDurableTopicTest { - - @Override - protected void configureBroker(BrokerService answer, String uri) throws Exception { - File dataFileDir = new File("target/test-amq-data/perfTest/amqdb"); - dataFileDir.mkdirs(); - // answer.setDeleteAllMessagesOnStartup(true); - - KahaDBStore adaptor = new KahaDBStore(); - adaptor.setDirectory(dataFileDir); - - answer.setDataDirectoryFile(dataFileDir); - answer.setPersistenceAdapter(adaptor); - answer.addConnector(uri); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b27695/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java deleted file mode 100644 index a2898f9..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb.perf; - -import java.io.File; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.perf.SimpleQueueTest; -import org.apache.activemq.store.kahadb.KahaDBStore; - -/** - * - */ -public class KahaStoreQueueTest extends SimpleQueueTest { - - @Override - protected void configureBroker(BrokerService answer, String uri) throws Exception { - File dataFileDir = new File("target/test-amq-data/perfTest/amqdb"); - dataFileDir.mkdirs(); - answer.setDeleteAllMessagesOnStartup(true); - - KahaDBStore adaptor = new KahaDBStore(); - adaptor.setDirectory(dataFileDir); - - answer.setDataDirectoryFile(dataFileDir); - answer.setPersistenceAdapter(adaptor); - answer.addConnector(uri); - } - -} -