Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EE85F200B80 for ; Wed, 14 Sep 2016 12:53:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id ED2C8160ADA; Wed, 14 Sep 2016 10:53:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CF2FD160AE2 for ; Wed, 14 Sep 2016 12:53:07 +0200 (CEST) Received: (qmail 5185 invoked by uid 500); 14 Sep 2016 10:53:07 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 5060 invoked by uid 99); 14 Sep 2016 10:53:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Sep 2016 10:53:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D5871DFCC0; Wed, 14 Sep 2016 10:53:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ptupitsyn@apache.org To: commits@ignite.apache.org Date: Wed, 14 Sep 2016 10:53:18 -0000 Message-Id: <0aeae6782ca245f9bea518ea29c09dcf@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [13/35] ignite git commit: IGNITE-3172 Refactoring Ignite-Cassandra serializers. - Fixes #956. archived-at: Wed, 14 Sep 2016 10:53:11 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java new file mode 100644 index 0000000..26cca68 --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests; + +import java.util.Collection; +import java.util.Map; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.internal.processors.cache.CacheEntryImpl; +import org.apache.ignite.tests.pojos.Person; +import org.apache.ignite.tests.pojos.PersonId; +import org.apache.ignite.tests.utils.CacheStoreHelper; +import org.apache.ignite.tests.utils.CassandraHelper; +import org.apache.ignite.tests.utils.TestsHelper; +import org.apache.log4j.Logger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.springframework.core.io.ClassPathResource; + +/** + * Unit tests for {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore} implementation of + * {@link org.apache.ignite.cache.store.CacheStore} which allows to store Ignite cache data into Cassandra tables. + */ +public class CassandraDirectPersistenceTest { + /** */ + private static final Logger LOGGER = Logger.getLogger(CassandraDirectPersistenceTest.class.getName()); + + /** */ + @BeforeClass + public static void setUpClass() { + if (CassandraHelper.useEmbeddedCassandra()) { + try { + CassandraHelper.startEmbeddedCassandra(LOGGER); + } + catch (Throwable e) { + throw new RuntimeException("Failed to start embedded Cassandra instance", e); + } + } + + LOGGER.info("Testing admin connection to Cassandra"); + CassandraHelper.testAdminConnection(); + + LOGGER.info("Testing regular connection to Cassandra"); + CassandraHelper.testRegularConnection(); + + LOGGER.info("Dropping all artifacts from previous tests execution session"); + CassandraHelper.dropTestKeyspaces(); + + LOGGER.info("Start tests execution"); + } + + /** */ + @AfterClass + public static void tearDownClass() { + try { + CassandraHelper.dropTestKeyspaces(); + } + finally { + CassandraHelper.releaseCassandraResources(); + + if (CassandraHelper.useEmbeddedCassandra()) { + try { + CassandraHelper.stopEmbeddedCassandra(); + } + catch (Throwable e) { + LOGGER.error("Failed to stop embedded Cassandra instance", e); + } + } + } + } + + /** */ + @Test + @SuppressWarnings("unchecked") + public void primitiveStrategyTest() { + CacheStore store1 = CacheStoreHelper.createCacheStore("longTypes", + new ClassPathResource("org/apache/ignite/tests/persistence/primitive/persistence-settings-1.xml"), + CassandraHelper.getAdminDataSrc()); + + CacheStore store2 = CacheStoreHelper.createCacheStore("stringTypes", + new ClassPathResource("org/apache/ignite/tests/persistence/primitive/persistence-settings-2.xml"), + CassandraHelper.getAdminDataSrc()); + + Collection> longEntries = TestsHelper.generateLongsEntries(); + Collection> strEntries = TestsHelper.generateStringsEntries(); + + Collection fakeLongKeys = TestsHelper.getKeys(longEntries); + fakeLongKeys.add(-1L); + fakeLongKeys.add(-2L); + fakeLongKeys.add(-3L); + fakeLongKeys.add(-4L); + + Collection fakeStrKeys = TestsHelper.getKeys(strEntries); + fakeStrKeys.add("-1"); + fakeStrKeys.add("-2"); + fakeStrKeys.add("-3"); + fakeStrKeys.add("-4"); + + LOGGER.info("Running PRIMITIVE strategy write tests"); + + LOGGER.info("Running single operation write tests"); + store1.write(longEntries.iterator().next()); + store2.write(strEntries.iterator().next()); + LOGGER.info("Single operation write tests passed"); + + LOGGER.info("Running bulk operation write tests"); + store1.writeAll(longEntries); + store2.writeAll(strEntries); + LOGGER.info("Bulk operation write tests passed"); + + LOGGER.info("PRIMITIVE strategy write tests passed"); + + LOGGER.info("Running PRIMITIVE strategy read tests"); + + LOGGER.info("Running single operation read tests"); + + LOGGER.info("Running real keys read tests"); + + Long longVal = (Long)store1.load(longEntries.iterator().next().getKey()); + if (!longEntries.iterator().next().getValue().equals(longVal)) + throw new RuntimeException("Long values was incorrectly deserialized from Cassandra"); + + String strVal = (String)store2.load(strEntries.iterator().next().getKey()); + if (!strEntries.iterator().next().getValue().equals(strVal)) + throw new RuntimeException("String values was incorrectly deserialized from Cassandra"); + + LOGGER.info("Running fake keys read tests"); + + longVal = (Long)store1.load(-1L); + if (longVal != null) + throw new RuntimeException("Long value with fake key '-1' was found in Cassandra"); + + strVal = (String)store2.load("-1"); + if (strVal != null) + throw new RuntimeException("String value with fake key '-1' was found in Cassandra"); + + LOGGER.info("Single operation read tests passed"); + + LOGGER.info("Running bulk operation read tests"); + + LOGGER.info("Running real keys read tests"); + + Map longValues = store1.loadAll(TestsHelper.getKeys(longEntries)); + if (!TestsHelper.checkCollectionsEqual(longValues, longEntries)) + throw new RuntimeException("Long values was incorrectly deserialized from Cassandra"); + + Map strValues = store2.loadAll(TestsHelper.getKeys(strEntries)); + if (!TestsHelper.checkCollectionsEqual(strValues, strEntries)) + throw new RuntimeException("String values was incorrectly deserialized from Cassandra"); + + LOGGER.info("Running fake keys read tests"); + + longValues = store1.loadAll(fakeLongKeys); + if (!TestsHelper.checkCollectionsEqual(longValues, longEntries)) + throw new RuntimeException("Long values was incorrectly deserialized from Cassandra"); + + strValues = store2.loadAll(fakeStrKeys); + if (!TestsHelper.checkCollectionsEqual(strValues, strEntries)) + throw new RuntimeException("String values was incorrectly deserialized from Cassandra"); + + LOGGER.info("Bulk operation read tests passed"); + + LOGGER.info("PRIMITIVE strategy read tests passed"); + + LOGGER.info("Running PRIMITIVE strategy delete tests"); + + LOGGER.info("Deleting real keys"); + + store1.delete(longEntries.iterator().next().getKey()); + store1.deleteAll(TestsHelper.getKeys(longEntries)); + + store2.delete(strEntries.iterator().next().getKey()); + store2.deleteAll(TestsHelper.getKeys(strEntries)); + + LOGGER.info("Deleting fake keys"); + + store1.delete(-1L); + store2.delete("-1"); + + store1.deleteAll(fakeLongKeys); + store2.deleteAll(fakeStrKeys); + + LOGGER.info("PRIMITIVE strategy delete tests passed"); + } + + /** */ + @Test + @SuppressWarnings("unchecked") + public void blobStrategyTest() { + CacheStore store1 = CacheStoreHelper.createCacheStore("longTypes", + new ClassPathResource("org/apache/ignite/tests/persistence/blob/persistence-settings-1.xml"), + CassandraHelper.getAdminDataSrc()); + + CacheStore store2 = CacheStoreHelper.createCacheStore("personTypes", + new ClassPathResource("org/apache/ignite/tests/persistence/blob/persistence-settings-2.xml"), + CassandraHelper.getAdminDataSrc()); + + CacheStore store3 = CacheStoreHelper.createCacheStore("personTypes", + new ClassPathResource("org/apache/ignite/tests/persistence/blob/persistence-settings-3.xml"), + CassandraHelper.getAdminDataSrc()); + + Collection> longEntries = TestsHelper.generateLongsEntries(); + Collection> personEntries = TestsHelper.generateLongsPersonsEntries(); + + LOGGER.info("Running BLOB strategy write tests"); + + LOGGER.info("Running single operation write tests"); + store1.write(longEntries.iterator().next()); + store2.write(personEntries.iterator().next()); + store3.write(personEntries.iterator().next()); + LOGGER.info("Single operation write tests passed"); + + LOGGER.info("Running bulk operation write tests"); + store1.writeAll(longEntries); + store2.writeAll(personEntries); + store3.writeAll(personEntries); + LOGGER.info("Bulk operation write tests passed"); + + LOGGER.info("BLOB strategy write tests passed"); + + LOGGER.info("Running BLOB strategy read tests"); + + LOGGER.info("Running single operation read tests"); + + Long longVal = (Long)store1.load(longEntries.iterator().next().getKey()); + if (!longEntries.iterator().next().getValue().equals(longVal)) + throw new RuntimeException("Long values was incorrectly deserialized from Cassandra"); + + Person personVal = (Person)store2.load(personEntries.iterator().next().getKey()); + if (!personEntries.iterator().next().getValue().equals(personVal)) + throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + + personVal = (Person)store3.load(personEntries.iterator().next().getKey()); + if (!personEntries.iterator().next().getValue().equals(personVal)) + throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + + LOGGER.info("Single operation read tests passed"); + + LOGGER.info("Running bulk operation read tests"); + + Map longValues = store1.loadAll(TestsHelper.getKeys(longEntries)); + if (!TestsHelper.checkCollectionsEqual(longValues, longEntries)) + throw new RuntimeException("Long values was incorrectly deserialized from Cassandra"); + + Map personValues = store2.loadAll(TestsHelper.getKeys(personEntries)); + if (!TestsHelper.checkPersonCollectionsEqual(personValues, personEntries, false)) + throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + + personValues = store3.loadAll(TestsHelper.getKeys(personEntries)); + if (!TestsHelper.checkPersonCollectionsEqual(personValues, personEntries, false)) + throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + + LOGGER.info("Bulk operation read tests passed"); + + LOGGER.info("BLOB strategy read tests passed"); + + LOGGER.info("Running BLOB strategy delete tests"); + + store1.delete(longEntries.iterator().next().getKey()); + store1.deleteAll(TestsHelper.getKeys(longEntries)); + + store2.delete(personEntries.iterator().next().getKey()); + store2.deleteAll(TestsHelper.getKeys(personEntries)); + + store3.delete(personEntries.iterator().next().getKey()); + store3.deleteAll(TestsHelper.getKeys(personEntries)); + + LOGGER.info("BLOB strategy delete tests passed"); + } + + /** */ + @Test + @SuppressWarnings("unchecked") + public void pojoStrategyTest() { + CacheStore store1 = CacheStoreHelper.createCacheStore("longTypes", + new ClassPathResource("org/apache/ignite/tests/persistence/pojo/persistence-settings-1.xml"), + CassandraHelper.getAdminDataSrc()); + + CacheStore store2 = CacheStoreHelper.createCacheStore("personTypes", + new ClassPathResource("org/apache/ignite/tests/persistence/pojo/persistence-settings-2.xml"), + CassandraHelper.getAdminDataSrc()); + + CacheStore store3 = CacheStoreHelper.createCacheStore("personTypes", + new ClassPathResource("org/apache/ignite/tests/persistence/pojo/persistence-settings-3.xml"), + CassandraHelper.getAdminDataSrc()); + + Collection> entries1 = TestsHelper.generateLongsPersonsEntries(); + Collection> entries2 = TestsHelper.generatePersonIdsPersonsEntries(); + Collection> entries3 = TestsHelper.generatePersonIdsPersonsEntries(); + + LOGGER.info("Running POJO strategy write tests"); + + LOGGER.info("Running single operation write tests"); + store1.write(entries1.iterator().next()); + store2.write(entries2.iterator().next()); + store3.write(entries3.iterator().next()); + LOGGER.info("Single operation write tests passed"); + + LOGGER.info("Running bulk operation write tests"); + store1.writeAll(entries1); + store2.writeAll(entries2); + store3.writeAll(entries3); + LOGGER.info("Bulk operation write tests passed"); + + LOGGER.info("POJO strategy write tests passed"); + + LOGGER.info("Running POJO strategy read tests"); + + LOGGER.info("Running single operation read tests"); + + Person person = (Person)store1.load(entries1.iterator().next().getKey()); + if (!entries1.iterator().next().getValue().equalsPrimitiveFields(person)) + throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + + person = (Person)store2.load(entries2.iterator().next().getKey()); + if (!entries2.iterator().next().getValue().equalsPrimitiveFields(person)) + throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + + person = (Person)store3.load(entries3.iterator().next().getKey()); + if (!entries3.iterator().next().getValue().equals(person)) + throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + + LOGGER.info("Single operation read tests passed"); + + LOGGER.info("Running bulk operation read tests"); + + Map persons = store1.loadAll(TestsHelper.getKeys(entries1)); + if (!TestsHelper.checkPersonCollectionsEqual(persons, entries1, true)) + throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + + persons = store2.loadAll(TestsHelper.getKeys(entries2)); + if (!TestsHelper.checkPersonCollectionsEqual(persons, entries2, true)) + throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + + persons = store3.loadAll(TestsHelper.getKeys(entries3)); + if (!TestsHelper.checkPersonCollectionsEqual(persons, entries3, false)) + throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + + LOGGER.info("Bulk operation read tests passed"); + + LOGGER.info("POJO strategy read tests passed"); + + LOGGER.info("Running POJO strategy delete tests"); + + store1.delete(entries1.iterator().next().getKey()); + store1.deleteAll(TestsHelper.getKeys(entries1)); + + store2.delete(entries2.iterator().next().getKey()); + store2.deleteAll(TestsHelper.getKeys(entries2)); + + store3.delete(entries3.iterator().next().getKey()); + store3.deleteAll(TestsHelper.getKeys(entries3)); + + LOGGER.info("POJO strategy delete tests passed"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java new file mode 100644 index 0000000..5de3097 --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests; + +import java.net.URL; +import org.apache.ignite.cache.store.cassandra.utils.DDLGenerator; +import org.junit.Test; + +/** + * DDLGenerator test. + */ +public class DDLGeneratorTest { + @Test + @SuppressWarnings("unchecked") + /** */ + public void generatorTest() { + ClassLoader clsLdr = DDLGeneratorTest.class.getClassLoader(); + + URL url1 = clsLdr.getResource("org/apache/ignite/tests/persistence/primitive/persistence-settings-1.xml"); + String file1 = url1.getFile(); // TODO IGNITE-1371 Possible NPE + + URL url2 = clsLdr.getResource("org/apache/ignite/tests/persistence/pojo/persistence-settings-3.xml"); + String file2 = url2.getFile(); // TODO IGNITE-1371 Possible NPE + + DDLGenerator.main(new String[]{file1, file2}); + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreLoadTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreLoadTest.java new file mode 100644 index 0000000..bfcf751 --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreLoadTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests; + +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.tests.load.LoadTestDriver; +import org.apache.ignite.tests.load.ignite.BulkReadWorker; +import org.apache.ignite.tests.load.ignite.BulkWriteWorker; +import org.apache.ignite.tests.load.ignite.ReadWorker; +import org.apache.ignite.tests.load.ignite.WriteWorker; +import org.apache.ignite.tests.utils.CassandraHelper; +import org.apache.ignite.tests.utils.TestsHelper; +import org.apache.log4j.Logger; + +/** + * Load tests for Ignite caches which utilizing {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore} + * to store cache data into Cassandra tables + */ +public class IgnitePersistentStoreLoadTest extends LoadTestDriver { + /** */ + private static final Logger LOGGER = Logger.getLogger("IgniteLoadTests"); + + /** + * test starter. + * + * @param args Test arguments. + */ + public static void main(String[] args) { + try { + LOGGER.info("Ignite load tests execution started"); + + LoadTestDriver driver = new IgnitePersistentStoreLoadTest(); + + /** + * Load test scripts could be executed from several machines. Current implementation can correctly, + * handle situation when Cassandra keyspace/table was dropped - for example by the same load test + * started a bit later on another machine. Moreover there is a warm up period for each load test. + * Thus all the delays related to keyspaces/tables recreation actions will not affect performance metrics, + * but it will be produced lots of "trash" output in the logs (related to correct handling of such + * exceptional situation and keyspace/table recreation). + * + * Thus dropping test keyspaces makes sense only for Unit tests, but not for Load tests. + **/ + + //CassandraHelper.dropTestKeyspaces(); + + driver.runTest("WRITE", WriteWorker.class, WriteWorker.LOGGER_NAME); + + driver.runTest("BULK_WRITE", BulkWriteWorker.class, BulkWriteWorker.LOGGER_NAME); + + driver.runTest("READ", ReadWorker.class, ReadWorker.LOGGER_NAME); + + driver.runTest("BULK_READ", BulkReadWorker.class, BulkReadWorker.LOGGER_NAME); + + /** + * Load test script executed on one machine could complete earlier that the same load test executed from + * another machine. Current implementation can correctly handle situation when Cassandra keyspace/table + * was dropped (simply recreate it). But dropping keyspace/table during load tests execution and subsequent + * recreation of such objects can have SIGNIFICANT EFFECT on final performance metrics. + * + * Thus dropping test keyspaces at the end of the tests makes sense only for Unit tests, + * but not for Load tests. + */ + + //CassandraHelper.dropTestKeyspaces(); + + LOGGER.info("Ignite load tests execution completed"); + } + catch (Throwable e) { + LOGGER.error("Ignite load tests execution failed", e); + throw new RuntimeException("Ignite load tests execution failed", e); + } + finally { + CassandraHelper.releaseCassandraResources(); + } + } + + /** {@inheritDoc} */ + @Override protected Logger logger() { + return LOGGER; + } + + /** {@inheritDoc} */ + @Override protected Object setup(String logName) { + return Ignition.start(TestsHelper.getLoadTestsIgniteConfig()); + } + + /** {@inheritDoc} */ + @Override protected void tearDown(Object obj) { + Ignite ignite = (Ignite)obj; + + if (ignite != null) + ignite.close(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java new file mode 100644 index 0000000..5da6ba2 --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests; + +import java.util.Collection; +import java.util.Map; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.processors.cache.CacheEntryImpl; +import org.apache.ignite.tests.pojos.Person; +import org.apache.ignite.tests.pojos.PersonId; +import org.apache.ignite.tests.utils.CacheStoreHelper; +import org.apache.ignite.tests.utils.CassandraHelper; +import org.apache.ignite.tests.utils.TestsHelper; +import org.apache.log4j.Logger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.springframework.core.io.ClassPathResource; + +/** + * Unit tests for Ignite caches which utilizing {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore} + * to store cache data into Cassandra tables + */ +public class IgnitePersistentStoreTest { + /** */ + private static final Logger LOGGER = Logger.getLogger(IgnitePersistentStoreTest.class.getName()); + + /** */ + @BeforeClass + public static void setUpClass() { + if (CassandraHelper.useEmbeddedCassandra()) { + try { + CassandraHelper.startEmbeddedCassandra(LOGGER); + } + catch (Throwable e) { + throw new RuntimeException("Failed to start embedded Cassandra instance", e); + } + } + + LOGGER.info("Testing admin connection to Cassandra"); + CassandraHelper.testAdminConnection(); + + LOGGER.info("Testing regular connection to Cassandra"); + CassandraHelper.testRegularConnection(); + + LOGGER.info("Dropping all artifacts from previous tests execution session"); + CassandraHelper.dropTestKeyspaces(); + + LOGGER.info("Start tests execution"); + } + + /** */ + @AfterClass + public static void tearDownClass() { + try { + CassandraHelper.dropTestKeyspaces(); + } + finally { + CassandraHelper.releaseCassandraResources(); + + if (CassandraHelper.useEmbeddedCassandra()) { + try { + CassandraHelper.stopEmbeddedCassandra(); + } + catch (Throwable e) { + LOGGER.error("Failed to stop embedded Cassandra instance", e); + } + } + } + } + + /** */ + @Test + public void primitiveStrategyTest() { + Ignition.stopAll(true); + + Map longMap = TestsHelper.generateLongsMap(); + Map strMap = TestsHelper.generateStringsMap(); + + LOGGER.info("Running PRIMITIVE strategy write tests"); + + try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/primitive/ignite-config.xml")) { + IgniteCache longCache = ignite.getOrCreateCache(new CacheConfiguration("cache1")); + IgniteCache strCache = ignite.getOrCreateCache(new CacheConfiguration("cache2")); + + LOGGER.info("Running single operation write tests"); + longCache.put(1L, 1L); + strCache.put("1", "1"); + LOGGER.info("Single operation write tests passed"); + + LOGGER.info("Running bulk operation write tests"); + longCache.putAll(longMap); + strCache.putAll(strMap); + LOGGER.info("Bulk operation write tests passed"); + } + + LOGGER.info("PRIMITIVE strategy write tests passed"); + + Ignition.stopAll(true); + + try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/primitive/ignite-config.xml")) { + LOGGER.info("Running PRIMITIVE strategy read tests"); + + IgniteCache longCache = ignite.getOrCreateCache(new CacheConfiguration("cache1")); + IgniteCache strCache = ignite.getOrCreateCache(new CacheConfiguration("cache2")); + + LOGGER.info("Running single operation read tests"); + + Long longVal = longCache.get(1L); + if (!longVal.equals(longMap.get(1L))) + throw new RuntimeException("Long value was incorrectly deserialized from Cassandra"); + + String strVal = strCache.get("1"); + if (!strVal.equals(strMap.get("1"))) + throw new RuntimeException("String value was incorrectly deserialized from Cassandra"); + + LOGGER.info("Single operation read tests passed"); + + LOGGER.info("Running bulk operation read tests"); + + Map longMap1 = longCache.getAll(longMap.keySet()); + if (!TestsHelper.checkMapsEqual(longMap, longMap1)) + throw new RuntimeException("Long values batch was incorrectly deserialized from Cassandra"); + + Map strMap1 = strCache.getAll(strMap.keySet()); + if (!TestsHelper.checkMapsEqual(strMap, strMap1)) + throw new RuntimeException("String values batch was incorrectly deserialized from Cassandra"); + + LOGGER.info("Bulk operation read tests passed"); + + LOGGER.info("PRIMITIVE strategy read tests passed"); + + LOGGER.info("Running PRIMITIVE strategy delete tests"); + + longCache.remove(1L); + longCache.removeAll(longMap.keySet()); + + strCache.remove("1"); + strCache.removeAll(strMap.keySet()); + + LOGGER.info("PRIMITIVE strategy delete tests passed"); + } + } + + /** */ + @Test + public void blobStrategyTest() { + Ignition.stopAll(true); + + Map longMap = TestsHelper.generateLongsMap(); + Map personMap = TestsHelper.generateLongsPersonsMap(); + + LOGGER.info("Running BLOB strategy write tests"); + + try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/blob/ignite-config.xml")) { + IgniteCache longCache = ignite.getOrCreateCache(new CacheConfiguration("cache1")); + IgniteCache personCache = ignite.getOrCreateCache(new CacheConfiguration("cache2")); + + LOGGER.info("Running single operation write tests"); + longCache.put(1L, 1L); + personCache.put(1L, TestsHelper.generateRandomPerson()); + LOGGER.info("Single operation write tests passed"); + + LOGGER.info("Running bulk operation write tests"); + longCache.putAll(longMap); + personCache.putAll(personMap); + LOGGER.info("Bulk operation write tests passed"); + } + + LOGGER.info("BLOB strategy write tests passed"); + + Ignition.stopAll(true); + + try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/blob/ignite-config.xml")) { + LOGGER.info("Running BLOB strategy read tests"); + + IgniteCache longCache = ignite.getOrCreateCache(new CacheConfiguration("cache1")); + IgniteCache personCache = ignite.getOrCreateCache(new CacheConfiguration("cache2")); + + LOGGER.info("Running single operation read tests"); + + Long longVal = longCache.get(1L); + if (!longVal.equals(longMap.get(1L))) + throw new RuntimeException("Long value was incorrectly deserialized from Cassandra"); + + Person person = personCache.get(1L); + if (!person.equals(personMap.get(1L))) + throw new RuntimeException("Person value was incorrectly deserialized from Cassandra"); + + LOGGER.info("Single operation read tests passed"); + + LOGGER.info("Running bulk operation read tests"); + + Map longMap1 = longCache.getAll(longMap.keySet()); + if (!TestsHelper.checkMapsEqual(longMap, longMap1)) + throw new RuntimeException("Long values batch was incorrectly deserialized from Cassandra"); + + Map personMap1 = personCache.getAll(personMap.keySet()); + if (!TestsHelper.checkPersonMapsEqual(personMap, personMap1, false)) + throw new RuntimeException("Person values batch was incorrectly deserialized from Cassandra"); + + LOGGER.info("Bulk operation read tests passed"); + + LOGGER.info("BLOB strategy read tests passed"); + + LOGGER.info("Running BLOB strategy delete tests"); + + longCache.remove(1L); + longCache.removeAll(longMap.keySet()); + + personCache.remove(1L); + personCache.removeAll(personMap.keySet()); + + LOGGER.info("BLOB strategy delete tests passed"); + } + } + + /** */ + @Test + public void pojoStrategyTest() { + Ignition.stopAll(true); + + LOGGER.info("Running POJO strategy write tests"); + + Map personMap1 = TestsHelper.generateLongsPersonsMap(); + Map personMap2 = TestsHelper.generatePersonIdsPersonsMap(); + + try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) { + IgniteCache personCache1 = ignite.getOrCreateCache(new CacheConfiguration("cache1")); + IgniteCache personCache2 = ignite.getOrCreateCache(new CacheConfiguration("cache2")); + IgniteCache personCache3 = ignite.getOrCreateCache(new CacheConfiguration("cache3")); + + LOGGER.info("Running single operation write tests"); + personCache1.put(1L, TestsHelper.generateRandomPerson()); + personCache2.put(TestsHelper.generateRandomPersonId(), TestsHelper.generateRandomPerson()); + personCache3.put(TestsHelper.generateRandomPersonId(), TestsHelper.generateRandomPerson()); + LOGGER.info("Single operation write tests passed"); + + LOGGER.info("Running bulk operation write tests"); + personCache1.putAll(personMap1); + personCache2.putAll(personMap2); + personCache3.putAll(personMap2); + LOGGER.info("Bulk operation write tests passed"); + } + + LOGGER.info("POJO strategy write tests passed"); + + Ignition.stopAll(true); + + try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) { + LOGGER.info("Running POJO strategy read tests"); + + IgniteCache personCache1 = ignite.getOrCreateCache(new CacheConfiguration("cache1")); + IgniteCache personCache2 = ignite.getOrCreateCache(new CacheConfiguration("cache2")); + IgniteCache personCache3 = ignite.getOrCreateCache(new CacheConfiguration("cache3")); + + LOGGER.info("Running single operation read tests"); + Person person = personCache1.get(1L); + if (!person.equalsPrimitiveFields(personMap1.get(1L))) + throw new RuntimeException("Person value was incorrectly deserialized from Cassandra"); + + PersonId id = personMap2.keySet().iterator().next(); + + person = personCache2.get(id); + if (!person.equalsPrimitiveFields(personMap2.get(id))) + throw new RuntimeException("Person value was incorrectly deserialized from Cassandra"); + + person = personCache3.get(id); + if (!person.equals(personMap2.get(id))) + throw new RuntimeException("Person value was incorrectly deserialized from Cassandra"); + + LOGGER.info("Single operation read tests passed"); + + LOGGER.info("Running bulk operation read tests"); + + Map persons1 = personCache1.getAll(personMap1.keySet()); + if (!TestsHelper.checkPersonMapsEqual(persons1, personMap1, true)) + throw new RuntimeException("Persons values batch was incorrectly deserialized from Cassandra"); + + Map persons2 = personCache2.getAll(personMap2.keySet()); + if (!TestsHelper.checkPersonMapsEqual(persons2, personMap2, true)) + throw new RuntimeException("Person values batch was incorrectly deserialized from Cassandra"); + + Map persons3 = personCache3.getAll(personMap2.keySet()); + if (!TestsHelper.checkPersonMapsEqual(persons3, personMap2, false)) + throw new RuntimeException("Person values batch was incorrectly deserialized from Cassandra"); + + LOGGER.info("Bulk operation read tests passed"); + + LOGGER.info("POJO strategy read tests passed"); + + LOGGER.info("Running POJO strategy delete tests"); + + personCache1.remove(1L); + personCache1.removeAll(personMap1.keySet()); + + personCache2.remove(id); + personCache2.removeAll(personMap2.keySet()); + + personCache3.remove(id); + personCache3.removeAll(personMap2.keySet()); + + LOGGER.info("POJO strategy delete tests passed"); + } + } + + /** */ + @Test + public void loadCacheTest() { + Ignition.stopAll(true); + + LOGGER.info("Running loadCache test"); + + LOGGER.info("Filling Cassandra table with test data"); + + CacheStore store = CacheStoreHelper.createCacheStore("personTypes", + new ClassPathResource("org/apache/ignite/tests/persistence/pojo/persistence-settings-3.xml"), + CassandraHelper.getAdminDataSrc()); + + Collection> entries = TestsHelper.generatePersonIdsPersonsEntries(); + + store.writeAll(entries); + + LOGGER.info("Cassandra table filled with test data"); + + LOGGER.info("Running loadCache test"); + + try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) { + IgniteCache personCache3 = ignite.getOrCreateCache(new CacheConfiguration("cache3")); + int size = personCache3.size(CachePeekMode.ALL); + + LOGGER.info("Initial cache size " + size); + + LOGGER.info("Loading cache data from Cassandra table"); + + personCache3.loadCache(null, new String[] {"select * from test1.pojo_test3 limit 3"}); + + size = personCache3.size(CachePeekMode.ALL); + if (size != 3) { + throw new RuntimeException("Cache data was incorrectly loaded from Cassandra. " + + "Expected number of records is 3, but loaded number of records is " + size); + } + + LOGGER.info("Cache data loaded from Cassandra table"); + } + + LOGGER.info("loadCache test passed"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/java/org/apache/ignite/tests/LoadTestsCassandraArtifactsCreator.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/LoadTestsCassandraArtifactsCreator.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/LoadTestsCassandraArtifactsCreator.java new file mode 100644 index 0000000..4fdb96f --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/LoadTestsCassandraArtifactsCreator.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests; + +import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; +import org.apache.ignite.tests.utils.CassandraHelper; +import org.apache.ignite.tests.utils.TestsHelper; + +import java.util.LinkedList; +import java.util.List; + +/** + * Recreates all required Cassandra database objects (keyspace, table, indexes) for load tests + */ +public class LoadTestsCassandraArtifactsCreator { + /** + * Recreates Cassandra artifacts required for load tests + * @param args not used + */ + public static void main(String[] args) { + try { + System.out.println("[INFO] Recreating Cassandra artifacts (keyspace, table, indexes) for load tests"); + + KeyValuePersistenceSettings perSettings = + new KeyValuePersistenceSettings(TestsHelper.getLoadTestsPersistenceSettings()); + + System.out.println("[INFO] Dropping test keyspace: " + perSettings.getKeyspace()); + + try { + CassandraHelper.dropTestKeyspaces(); + } catch (Throwable e) { + throw new RuntimeException("Failed to drop test keyspace: " + perSettings.getKeyspace(), e); + } + + System.out.println("[INFO] Test keyspace '" + perSettings.getKeyspace() + "' was successfully dropped"); + + System.out.println("[INFO] Creating test keyspace: " + perSettings.getKeyspace()); + + try { + CassandraHelper.executeWithAdminCredentials(perSettings.getKeyspaceDDLStatement()); + } catch (Throwable e) { + throw new RuntimeException("Failed to create test keyspace: " + perSettings.getKeyspace(), e); + } + + System.out.println("[INFO] Test keyspace '" + perSettings.getKeyspace() + "' was successfully created"); + + System.out.println("[INFO] Creating test table: " + perSettings.getTable()); + + try { + CassandraHelper.executeWithAdminCredentials(perSettings.getTableDDLStatement()); + } catch (Throwable e) { + throw new RuntimeException("Failed to create test table: " + perSettings.getTable(), e); + } + + System.out.println("[INFO] Test table '" + perSettings.getTable() + "' was successfully created"); + + List statements = perSettings.getIndexDDLStatements(); + if (statements == null) + statements = new LinkedList<>(); + + for (String statement : statements) { + System.out.println("[INFO] Creating test table index:"); + System.out.println(statement); + + try { + CassandraHelper.executeWithAdminCredentials(statement); + } catch (Throwable e) { + throw new RuntimeException("Failed to create test table index", e); + } + + System.out.println("[INFO] Test table index was successfully created"); + } + + System.out.println("[INFO] All required Cassandra artifacts were successfully recreated"); + } + catch (Throwable e) { + System.out.println("[ERROR] Failed to recreate Cassandra artifacts"); + e.printStackTrace(System.out); + + if (e instanceof RuntimeException) + throw (RuntimeException)e; + else + throw new RuntimeException(e); + } + finally { + CassandraHelper.releaseCassandraResources(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/Generator.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/Generator.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/Generator.java new file mode 100644 index 0000000..0c18bc0 --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/Generator.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.load; + +/** + * Generator abstraction which could be used by tests to generate next key/value pair for Ignite cache + * from provided int number (which sequentially incremented in load test driver loop). + */ +public interface Generator { + /** */ + public Object generate(long i); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/IntGenerator.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/IntGenerator.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/IntGenerator.java new file mode 100644 index 0000000..a31abee --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/IntGenerator.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.load; + +/** + * Implementation of {@link org.apache.ignite.tests.load.Generator} generating {@link Integer} instance. + */ +public class IntGenerator implements Generator { + /** {@inheritDoc} */ + @Override public Object generate(long i) { + long val = i / 10000; + + while (val > Integer.MAX_VALUE) + val = val / 2; + + return (int)val; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/LoadTestDriver.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/LoadTestDriver.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/LoadTestDriver.java new file mode 100644 index 0000000..296839d --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/LoadTestDriver.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.load; + +import java.lang.reflect.Constructor; +import java.util.LinkedList; +import java.util.List; +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.cassandra.common.SystemHelper; +import org.apache.ignite.tests.utils.TestsHelper; +import org.apache.log4j.Logger; + +/** + * Basic load test driver to be inherited by specific implementation for particular use-case. + */ +public abstract class LoadTestDriver { + /** Number of attempts to setup load test */ + private static final int NUMBER_OF_SETUP_ATTEMPTS = 10; + + /** Timeout between load test setup attempts */ + private static final int SETUP_ATTEMPT_TIMEOUT = 1000; + + /** */ + public void runTest(String testName, Class clazz, String logName) { + logger().info("Running " + testName + " test"); + + Object cfg = null; + + int attempt; + + logger().info("Setting up load tests driver"); + + for (attempt = 0; attempt < NUMBER_OF_SETUP_ATTEMPTS; attempt++) { + try { + cfg = setup(logName); + break; + } + catch (Throwable e) { + logger().error((attempt + 1) + " attempt to setup load test '" + testName + "' failed", e); + } + + if (attempt + 1 != NUMBER_OF_SETUP_ATTEMPTS) { + logger().info("Sleeping for " + SETUP_ATTEMPT_TIMEOUT + " seconds before trying next attempt " + + "to setup '" + testName + "' load test"); + + try { + Thread.sleep(SETUP_ATTEMPT_TIMEOUT); + } + catch (InterruptedException ignored) { + // No-op. + } + } + } + + if (cfg == null && attempt == NUMBER_OF_SETUP_ATTEMPTS) { + throw new RuntimeException("All " + NUMBER_OF_SETUP_ATTEMPTS + " attempts to setup load test '" + + testName+ "' have failed"); + } + + // calculates host unique prefix based on its subnet IP address + long hostUniqePrefix = getHostUniquePrefix(); + + logger().info("Load tests driver setup successfully completed"); + + try { + + List workers = new LinkedList<>(); + long startPosition = 0; + + logger().info("Starting workers"); + + for (int i = 0; i < TestsHelper.getLoadTestsThreadsCount(); i++) { + Worker worker = createWorker(clazz, cfg, + hostUniqePrefix + startPosition, + hostUniqePrefix + startPosition + 100000000); + workers.add(worker); + worker.setName(testName + "-worker-" + i); + worker.start(); + startPosition += 100000001; + } + + logger().info("Workers started"); + logger().info("Waiting for workers to complete"); + + List failedWorkers = new LinkedList<>(); + + for (Worker worker : workers) { + boolean failed = false; + + try { + worker.join(); + } + catch (Throwable e) { + logger().error("Worker " + worker.getName() + " waiting interrupted", e); + failed = true; + } + + if (failed || worker.isFailed()) { + failedWorkers.add(worker.getName()); + logger().info("Worker " + worker.getName() + " execution failed"); + } + else + logger().info("Worker " + worker.getName() + " successfully completed"); + } + + printTestResultsHeader(testName, failedWorkers); + printTestResultsStatistics(testName, workers); + } + finally { + tearDown(cfg); + } + } + + /** */ + protected abstract Logger logger(); + + /** */ + protected abstract Object setup(String logName); + + /** */ + protected void tearDown(Object obj) { + } + + /** */ + @SuppressWarnings("unchecked") + private Worker createWorker(Class clazz, Object cfg, long startPosition, long endPosition) { + try { + Class cfgCls = cfg instanceof Ignite ? Ignite.class : CacheStore.class; + + Constructor ctor = clazz.getConstructor(cfgCls, long.class, long.class); + + return (Worker)ctor.newInstance(cfg, startPosition, endPosition); + } + catch (Throwable e) { + logger().error("Failed to instantiate worker of class '" + clazz.getName() + "'", e); + throw new RuntimeException("Failed to instantiate worker of class '" + clazz.getName() + "'", e); + } + } + + /** */ + private void printTestResultsHeader(String testName, List failedWorkers) { + if (failedWorkers.isEmpty()) { + logger().info(testName + " test execution successfully completed."); + return; + } + + if (failedWorkers.size() == TestsHelper.getLoadTestsThreadsCount()) { + logger().error(testName + " test execution totally failed."); + return; + } + + String strFailedWorkers = ""; + + for (String workerName : failedWorkers) { + if (!strFailedWorkers.isEmpty()) + strFailedWorkers += ", "; + + strFailedWorkers += workerName; + } + + logger().warn(testName + " test execution completed, but " + failedWorkers.size() + " of " + + TestsHelper.getLoadTestsThreadsCount() + " workers failed. Failed workers: " + strFailedWorkers); + } + + /** */ + @SuppressWarnings("StringBufferReplaceableByString") + private void printTestResultsStatistics(String testName, List workers) { + long cnt = 0; + long errCnt = 0; + long speed = 0; + + for (Worker worker : workers) { + cnt += worker.getMsgProcessed(); + errCnt += worker.getErrorsCount(); + speed += worker.getSpeed(); + } + + float errPercent = errCnt == 0 ? + 0 : + cnt + errCnt == 0 ? 0 : (float)(errCnt * 100 ) / (float)(cnt + errCnt); + + StringBuilder builder = new StringBuilder(); + builder.append(SystemHelper.LINE_SEPARATOR); + builder.append("-------------------------------------------------"); + builder.append(SystemHelper.LINE_SEPARATOR); + builder.append(testName).append(" test statistics").append(SystemHelper.LINE_SEPARATOR); + builder.append(testName).append(" messages: ").append(cnt).append(SystemHelper.LINE_SEPARATOR); + builder.append(testName).append(" errors: ").append(errCnt).append(", "). + append(String.format("%.2f", errPercent).replace(",", ".")). + append("%").append(SystemHelper.LINE_SEPARATOR); + builder.append(testName).append(" speed: ").append(speed).append(" msg/sec").append(SystemHelper.LINE_SEPARATOR); + builder.append("-------------------------------------------------"); + + logger().info(builder.toString()); + } + + /** */ + private long getHostUniquePrefix() { + String[] parts = SystemHelper.HOST_IP.split("\\."); + + if (parts[2].equals("0")) + parts[2] = "777"; + + if (parts[3].equals("0")) + parts[3] = "777"; + + long part3 = Long.parseLong(parts[2]); + long part4 = Long.parseLong(parts[3]); + + if (part3 < 10) + part3 = part3 * 100; + else if (part4 < 100) + part3 = part3 * 10; + + if (part4 < 10) + part4 = part4 * 100; + else if (part4 < 100) + part4 = part4 * 10; + + return (part4 * 100000000000000L) + (part3 * 100000000000L) + Thread.currentThread().getId(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/LongGenerator.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/LongGenerator.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/LongGenerator.java new file mode 100644 index 0000000..0398f98 --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/LongGenerator.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.load; + +/** + * Implementation of {@link org.apache.ignite.tests.load.Generator} generating {@link Long} instance. + */ +public class LongGenerator implements Generator { + /** {@inheritDoc} */ + @Override public Object generate(long i) { + return i; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/PersonGenerator.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/PersonGenerator.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/PersonGenerator.java new file mode 100644 index 0000000..0317320 --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/PersonGenerator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.load; + +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import org.apache.ignite.tests.pojos.Person; + +/** + * Implementation of {@link Generator} generating {@link Person} instance. + */ +public class PersonGenerator implements Generator { + /** */ + private static final Date DATE = new Date(); + + /** */ + private static final List PHONES = new LinkedList(){{ + add("1234567"); + add("7654321"); + add("1289054"); + }}; + + /** {@inheritDoc} */ + @Override public Object generate(long i) { + return new Person(Long.toString(i), Long.toString(i), (int)(i % 100), i % 2 == 0, i, i, DATE, PHONES); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/PersonIdGenerator.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/PersonIdGenerator.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/PersonIdGenerator.java new file mode 100644 index 0000000..a11e0d8 --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/PersonIdGenerator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.load; + +import org.apache.ignite.tests.pojos.PersonId; + +/** + * Implementation of {@link org.apache.ignite.tests.load.Generator} generating + * {@link org.apache.ignite.tests.pojos.PersonId} instance. + */ +public class PersonIdGenerator implements Generator { + /** {@inheritDoc} */ + @Override public Object generate(long i) { + return new PersonId(Long.toString(i), Long.toString(i), i); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/StringGenerator.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/StringGenerator.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/StringGenerator.java new file mode 100644 index 0000000..cfaf34a --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/StringGenerator.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.load; + +/** + * Implementation of {@link org.apache.ignite.tests.load.Generator} generating {@link String} instance. + */ +public class StringGenerator implements Generator { + /** {@inheritDoc} */ + @Override public Object generate(long i) { + return Long.toString(i); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/Worker.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/Worker.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/Worker.java new file mode 100644 index 0000000..f4bffc7 --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/Worker.java @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.load; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.cassandra.common.SystemHelper; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.processors.cache.CacheEntryImpl; +import org.apache.ignite.tests.utils.TestsHelper; +import org.apache.log4j.Logger; + +/** + * Worker thread abstraction to be inherited by specific load test implementation + */ +public abstract class Worker extends Thread { + /** */ + private static final SimpleDateFormat TIME_FORMATTER = new SimpleDateFormat("hh:mm:ss"); + + /** */ + private long testStartTime; + + /** */ + boolean warmup = TestsHelper.getLoadTestsWarmupPeriod() != 0; + + /** */ + private volatile long warmupStartTime = 0; + + /** */ + private volatile long warmupFinishTime = 0; + + /** */ + private volatile long startTime = 0; + + /** */ + private volatile long finishTime = 0; + + /** */ + private volatile long warmupMsgProcessed = 0; + + /** */ + private volatile long warmupSleepCnt = 0; + + /** */ + private volatile long msgProcessed = 0; + + /** */ + private volatile long msgFailed = 0; + + /** */ + private volatile long sleepCnt = 0; + + /** */ + private Throwable executionError; + + /** */ + private long statReportedTime; + + /** */ + private CacheStore cacheStore; + + /** */ + private Ignite ignite; + + /** */ + private IgniteCache igniteCache; + + /** */ + private Logger log; + + /** */ + private long startPosition; + + /** */ + private long endPosition; + + /** */ + public Worker(CacheStore cacheStore, long startPosition, long endPosition) { + this.cacheStore = cacheStore; + this.log = Logger.getLogger(loggerName()); + this.startPosition = startPosition; + this.endPosition = endPosition; + } + + /** */ + public Worker(Ignite ignite, long startPosition, long endPosition) { + this.ignite = ignite; + this.log = Logger.getLogger(loggerName()); + this.startPosition = startPosition; + this.endPosition = endPosition; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void run() { + try { + if (ignite != null) + igniteCache = ignite.getOrCreateCache(new CacheConfiguration(TestsHelper.getLoadTestsCacheName())); + + execute(); + } + catch (Throwable e) { + executionError = e; + throw new RuntimeException("Test execution abnormally terminated", e); + } + finally { + reportTestCompletion(); + } + } + + /** */ + public boolean isFailed() { + return executionError != null; + } + + /** */ + public long getSpeed() { + if (msgProcessed == 0) + return 0; + + long finish = finishTime != 0 ? finishTime : System.currentTimeMillis(); + long duration = (finish - startTime - sleepCnt * TestsHelper.getLoadTestsRequestsLatency()) / 1000; + + return duration == 0 ? msgProcessed : msgProcessed / duration; + } + + /** */ + public long getErrorsCount() { + return msgFailed; + } + + /** */ + public float getErrorsPercent() { + if (msgFailed == 0) + return 0; + + return msgProcessed + msgFailed == 0 ? 0 : (float)(msgFailed * 100 ) / (float)(msgProcessed + msgFailed); + } + + /** */ + public long getMsgCountTotal() { + return warmupMsgProcessed + msgProcessed; + } + + /** */ + public long getWarmupMsgProcessed() { + return warmupMsgProcessed; + } + + /** */ + public long getMsgProcessed() { + return msgProcessed; + } + + /** */ + protected abstract String loggerName(); + + /** */ + protected abstract boolean batchMode(); + + /** */ + protected void process(CacheStore cacheStore, CacheEntryImpl entry) { + throw new UnsupportedOperationException("Single message processing is not supported"); + } + + /** */ + protected void process(IgniteCache cache, Object key, Object val) { + throw new UnsupportedOperationException("Single message processing is not supported"); + } + + /** */ + protected void process(CacheStore cacheStore, Collection entries) { + throw new UnsupportedOperationException("Batch processing is not supported"); + } + + /** */ + protected void process(IgniteCache cache, Map map) { + throw new UnsupportedOperationException("Batch processing is not supported"); + } + + /** */ + @SuppressWarnings("unchecked") + private void execute() throws InterruptedException { + testStartTime = System.currentTimeMillis(); + + log.info("Test execution started"); + + if (warmup) + log.info("Warm up period started"); + + warmupStartTime = warmup ? testStartTime : 0; + startTime = !warmup ? testStartTime : 0; + + statReportedTime = testStartTime; + + long cntr = startPosition; + Object key = TestsHelper.generateLoadTestsKey(cntr); + Object val = TestsHelper.generateLoadTestsValue(cntr); + List batchList = new ArrayList<>(TestsHelper.getBulkOperationSize()); + Map batchMap = new HashMap(TestsHelper.getBulkOperationSize()); + + int execTime = TestsHelper.getLoadTestsWarmupPeriod() + TestsHelper.getLoadTestsExecutionTime(); + + try { + while (true) { + if (System.currentTimeMillis() - testStartTime > execTime) + break; + + if (warmup && System.currentTimeMillis() - testStartTime > TestsHelper.getLoadTestsWarmupPeriod()) { + warmupFinishTime = System.currentTimeMillis(); + startTime = warmupFinishTime; + statReportedTime = warmupFinishTime; + warmup = false; + log.info("Warm up period completed"); + } + + if (!batchMode()) { + if (cacheStore != null) + doWork(new CacheEntryImpl(key, val)); + else + doWork(key, val); + } + else if (batchList.size() == TestsHelper.getBulkOperationSize() || + batchMap.size() == TestsHelper.getBulkOperationSize()) { + if (cacheStore != null) + doWork(batchList); + else + doWork(batchMap); + + batchMap.clear(); + batchList.clear(); + } + + if (cntr == endPosition) + cntr = startPosition; + else + cntr++; + + key = TestsHelper.generateLoadTestsKey(cntr); + val = TestsHelper.generateLoadTestsValue(cntr); + + if (batchMode()) { + if (cacheStore != null) + batchList.add(new CacheEntryImpl(key, val)); + else + batchMap.put(key, val); + } + + reportStatistics(); + } + } + finally { + warmupFinishTime = warmupFinishTime != 0 ? warmupFinishTime : System.currentTimeMillis(); + finishTime = System.currentTimeMillis(); + } + } + + /** */ + private void doWork(CacheEntryImpl entry) { + try { + process(cacheStore, entry); + updateMetrics(1); + } + catch (Throwable e) { + log.error("Failed to perform single operation", e); + updateErrorMetrics(1); + } + } + + /** */ + private void doWork(Object key, Object val) { + try { + process(igniteCache, key, val); + updateMetrics(1); + } + catch (Throwable e) { + log.error("Failed to perform single operation", e); + updateErrorMetrics(1); + } + } + + /** */ + private void doWork(Collection entries) { + try { + process(cacheStore, entries); + updateMetrics(entries.size()); + } + catch (Throwable e) { + log.error("Failed to perform batch operation", e); + updateErrorMetrics(entries.size()); + } + } + + /** */ + private void doWork(Map entries) { + try { + process(igniteCache, entries); + updateMetrics(entries.size()); + } + catch (Throwable e) { + log.error("Failed to perform batch operation", e); + updateErrorMetrics(entries.size()); + } + } + + /** */ + private long getWarmUpSpeed() { + if (warmupMsgProcessed == 0) + return 0; + + long finish = warmupFinishTime != 0 ? warmupFinishTime : System.currentTimeMillis(); + long duration = (finish - warmupStartTime - warmupSleepCnt * TestsHelper.getLoadTestsRequestsLatency()) / 1000; + + return duration == 0 ? warmupMsgProcessed : warmupMsgProcessed / duration; + } + + /** */ + private void updateMetrics(int itemsProcessed) { + if (warmup) + warmupMsgProcessed += itemsProcessed; + else + msgProcessed += itemsProcessed; + + if (TestsHelper.getLoadTestsRequestsLatency() > 0) { + try { + Thread.sleep(TestsHelper.getLoadTestsRequestsLatency()); + + if (warmup) + warmupSleepCnt++; + else + sleepCnt++; + } + catch (Throwable ignored) { + } + } + } + + /** + * TODO IGNITE-1371 Comment absent. + * + * @param itemsFailed Failed item. + */ + private void updateErrorMetrics(int itemsFailed) { + if (!warmup) + msgFailed += itemsFailed; + } + + /** */ + private void reportStatistics() { + // statistics should be reported only every 30 seconds + if (System.currentTimeMillis() - statReportedTime < 30000) + return; + + statReportedTime = System.currentTimeMillis(); + + int completed = warmup ? + (int)(statReportedTime - warmupStartTime) * 100 / TestsHelper.getLoadTestsWarmupPeriod() : + (int)(statReportedTime - startTime) * 100 / TestsHelper.getLoadTestsExecutionTime(); + + if (completed > 100) + completed = 100; + + if (warmup) { + log.info("Warm up messages processed " + warmupMsgProcessed + ", " + + "speed " + getWarmUpSpeed() + " msg/sec, " + completed + "% completed"); + } + else { + log.info("Messages processed " + msgProcessed + ", " + + "speed " + getSpeed() + " msg/sec, " + completed + "% completed, " + + "errors " + msgFailed + " / " + String.format("%.2f", getErrorsPercent()).replace(",", ".") + "%"); + } + } + + /** */ + private void reportTestCompletion() { + StringBuilder builder = new StringBuilder(); + + if (executionError != null) + builder.append("Test execution abnormally terminated. "); + else + builder.append("Test execution successfully completed. "); + + builder.append("Statistics: ").append(SystemHelper.LINE_SEPARATOR); + builder.append("Start time: ").append(TIME_FORMATTER.format(testStartTime)).append(SystemHelper.LINE_SEPARATOR); + builder.append("Finish time: ").append(TIME_FORMATTER.format(finishTime)).append(SystemHelper.LINE_SEPARATOR); + builder.append("Duration: ").append((finishTime - testStartTime) / 1000).append(" sec") + .append(SystemHelper.LINE_SEPARATOR); + + if (TestsHelper.getLoadTestsWarmupPeriod() > 0) { + builder.append("Warm up period: ").append(TestsHelper.getLoadTestsWarmupPeriod() / 1000) + .append(" sec").append(SystemHelper.LINE_SEPARATOR); + builder.append("Warm up processed messages: ").append(warmupMsgProcessed).append(SystemHelper.LINE_SEPARATOR); + builder.append("Warm up processing speed: ").append(getWarmUpSpeed()) + .append(" msg/sec").append(SystemHelper.LINE_SEPARATOR); + } + + builder.append("Processed messages: ").append(msgProcessed).append(SystemHelper.LINE_SEPARATOR); + builder.append("Processing speed: ").append(getSpeed()).append(" msg/sec").append(SystemHelper.LINE_SEPARATOR); + builder.append("Errors: ").append(msgFailed).append(" / "). + append(String.format("%.2f", getErrorsPercent()).replace(",", ".")).append("%"); + + if (executionError != null) + log.error(builder.toString(), executionError); + else + log.info(builder.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/cassandra/BulkReadWorker.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/cassandra/BulkReadWorker.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/cassandra/BulkReadWorker.java new file mode 100644 index 0000000..38f0db8 --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/cassandra/BulkReadWorker.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.load.cassandra; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.internal.processors.cache.CacheEntryImpl; +import org.apache.ignite.tests.load.Worker; +import org.apache.ignite.tests.utils.TestsHelper; + +/** + * Cassandra direct load tests worker for bulk read operation CacheStore.load + */ +public class BulkReadWorker extends Worker { + /** */ + public static final String LOGGER_NAME = "CassandraBulkReadLoadTest"; + + /** */ + private List keys = new ArrayList<>(TestsHelper.getBulkOperationSize()); + + /** */ + public BulkReadWorker(CacheStore cacheStore, long startPosition, long endPosition) { + super(cacheStore, startPosition, endPosition); + } + + /** {@inheritDoc} */ + @Override protected String loggerName() { + return LOGGER_NAME; + } + + /** {@inheritDoc} */ + @Override protected boolean batchMode() { + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected void process(CacheStore cacheStore, Collection entries) { + keys.clear(); + + for (CacheEntryImpl entry : entries) + keys.add(entry.getKey()); + + cacheStore.loadAll(keys); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/cassandra/BulkWriteWorker.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/cassandra/BulkWriteWorker.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/cassandra/BulkWriteWorker.java new file mode 100644 index 0000000..c71728f --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/cassandra/BulkWriteWorker.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.load.cassandra; + +import java.util.Collection; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.internal.processors.cache.CacheEntryImpl; +import org.apache.ignite.tests.load.Worker; + +/** + * Cassandra direct load tests worker for bulk write operation CacheStore.writeAll + */ +public class BulkWriteWorker extends Worker { + /** */ + public static final String LOGGER_NAME = "CassandraBulkWriteLoadTest"; + + /** */ + public BulkWriteWorker(CacheStore cacheStore, long startPosition, long endPosition) { + super(cacheStore, startPosition, endPosition); + } + + /** {@inheritDoc} */ + @Override protected String loggerName() { + return LOGGER_NAME; + } + + /** {@inheritDoc} */ + @Override protected boolean batchMode() { + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected void process(CacheStore cacheStore, Collection entries) { + cacheStore.writeAll(entries); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/cassandra/ReadWorker.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/cassandra/ReadWorker.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/cassandra/ReadWorker.java new file mode 100644 index 0000000..051b55f --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/cassandra/ReadWorker.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.load.cassandra; + +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.internal.processors.cache.CacheEntryImpl; +import org.apache.ignite.tests.load.Worker; + +/** + * Cassandra direct load tests worker for read operation CacheStore.load + */ +public class ReadWorker extends Worker { + /** */ + public static final String LOGGER_NAME = "CassandraReadLoadTest"; + + /** */ + public ReadWorker(CacheStore cacheStore, long startPosition, long endPosition) { + super(cacheStore, startPosition, endPosition); + } + + /** {@inheritDoc} */ + @Override protected String loggerName() { + return LOGGER_NAME; + } + + /** {@inheritDoc} */ + @Override protected boolean batchMode() { + return false; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected void process(CacheStore cacheStore, CacheEntryImpl entry) { + cacheStore.load(entry.getKey()); + } +}