ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [20/50] ignite git commit: IGNITE-1371 Implemented Cassandra cache store.
Date Mon, 20 Jun 2016 12:53:00 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
new file mode 100644
index 0000000..5da6ba2
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.tests.pojos.Person;
+import org.apache.ignite.tests.pojos.PersonId;
+import org.apache.ignite.tests.utils.CacheStoreHelper;
+import org.apache.ignite.tests.utils.CassandraHelper;
+import org.apache.ignite.tests.utils.TestsHelper;
+import org.apache.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.springframework.core.io.ClassPathResource;
+
+/**
+ * Unit tests for Ignite caches which utilizing {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}
+ * to store cache data into Cassandra tables
+ */
+public class IgnitePersistentStoreTest {
+    /** */
+    private static final Logger LOGGER = Logger.getLogger(IgnitePersistentStoreTest.class.getName());
+
+    /** */
+    @BeforeClass
+    public static void setUpClass() {
+        if (CassandraHelper.useEmbeddedCassandra()) {
+            try {
+                CassandraHelper.startEmbeddedCassandra(LOGGER);
+            }
+            catch (Throwable e) {
+                throw new RuntimeException("Failed to start embedded Cassandra instance", e);
+            }
+        }
+
+        LOGGER.info("Testing admin connection to Cassandra");
+        CassandraHelper.testAdminConnection();
+
+        LOGGER.info("Testing regular connection to Cassandra");
+        CassandraHelper.testRegularConnection();
+
+        LOGGER.info("Dropping all artifacts from previous tests execution session");
+        CassandraHelper.dropTestKeyspaces();
+
+        LOGGER.info("Start tests execution");
+    }
+
+    /** */
+    @AfterClass
+    public static void tearDownClass() {
+        try {
+            CassandraHelper.dropTestKeyspaces();
+        }
+        finally {
+            CassandraHelper.releaseCassandraResources();
+
+            if (CassandraHelper.useEmbeddedCassandra()) {
+                try {
+                    CassandraHelper.stopEmbeddedCassandra();
+                }
+                catch (Throwable e) {
+                    LOGGER.error("Failed to stop embedded Cassandra instance", e);
+                }
+            }
+        }
+    }
+
+    /** */
+    @Test
+    public void primitiveStrategyTest() {
+        Ignition.stopAll(true);
+
+        Map<Long, Long> longMap = TestsHelper.generateLongsMap();
+        Map<String, String> strMap = TestsHelper.generateStringsMap();
+
+        LOGGER.info("Running PRIMITIVE strategy write tests");
+
+        try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/primitive/ignite-config.xml")) {
+            IgniteCache<Long, Long> longCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Long>("cache1"));
+            IgniteCache<String, String> strCache = ignite.getOrCreateCache(new CacheConfiguration<String, String>("cache2"));
+
+            LOGGER.info("Running single operation write tests");
+            longCache.put(1L, 1L);
+            strCache.put("1", "1");
+            LOGGER.info("Single operation write tests passed");
+
+            LOGGER.info("Running bulk operation write tests");
+            longCache.putAll(longMap);
+            strCache.putAll(strMap);
+            LOGGER.info("Bulk operation write tests passed");
+        }
+
+        LOGGER.info("PRIMITIVE strategy write tests passed");
+
+        Ignition.stopAll(true);
+
+        try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/primitive/ignite-config.xml")) {
+            LOGGER.info("Running PRIMITIVE strategy read tests");
+
+            IgniteCache<Long, Long> longCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Long>("cache1"));
+            IgniteCache<String, String> strCache = ignite.getOrCreateCache(new CacheConfiguration<String, String>("cache2"));
+
+            LOGGER.info("Running single operation read tests");
+
+            Long longVal = longCache.get(1L);
+            if (!longVal.equals(longMap.get(1L)))
+                throw new RuntimeException("Long value was incorrectly deserialized from Cassandra");
+
+            String strVal = strCache.get("1");
+            if (!strVal.equals(strMap.get("1")))
+                throw new RuntimeException("String value was incorrectly deserialized from Cassandra");
+
+            LOGGER.info("Single operation read tests passed");
+
+            LOGGER.info("Running bulk operation read tests");
+
+            Map<Long, Long> longMap1 = longCache.getAll(longMap.keySet());
+            if (!TestsHelper.checkMapsEqual(longMap, longMap1))
+                throw new RuntimeException("Long values batch was incorrectly deserialized from Cassandra");
+
+            Map<String, String> strMap1 = strCache.getAll(strMap.keySet());
+            if (!TestsHelper.checkMapsEqual(strMap, strMap1))
+                throw new RuntimeException("String values batch was incorrectly deserialized from Cassandra");
+
+            LOGGER.info("Bulk operation read tests passed");
+
+            LOGGER.info("PRIMITIVE strategy read tests passed");
+
+            LOGGER.info("Running PRIMITIVE strategy delete tests");
+
+            longCache.remove(1L);
+            longCache.removeAll(longMap.keySet());
+
+            strCache.remove("1");
+            strCache.removeAll(strMap.keySet());
+
+            LOGGER.info("PRIMITIVE strategy delete tests passed");
+        }
+    }
+
+    /** */
+    @Test
+    public void blobStrategyTest() {
+        Ignition.stopAll(true);
+
+        Map<Long, Long> longMap = TestsHelper.generateLongsMap();
+        Map<Long, Person> personMap = TestsHelper.generateLongsPersonsMap();
+
+        LOGGER.info("Running BLOB strategy write tests");
+
+        try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/blob/ignite-config.xml")) {
+            IgniteCache<Long, Long> longCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Long>("cache1"));
+            IgniteCache<Long, Person> personCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Person>("cache2"));
+
+            LOGGER.info("Running single operation write tests");
+            longCache.put(1L, 1L);
+            personCache.put(1L, TestsHelper.generateRandomPerson());
+            LOGGER.info("Single operation write tests passed");
+
+            LOGGER.info("Running bulk operation write tests");
+            longCache.putAll(longMap);
+            personCache.putAll(personMap);
+            LOGGER.info("Bulk operation write tests passed");
+        }
+
+        LOGGER.info("BLOB strategy write tests passed");
+
+        Ignition.stopAll(true);
+
+        try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/blob/ignite-config.xml")) {
+            LOGGER.info("Running BLOB strategy read tests");
+
+            IgniteCache<Long, Long> longCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Long>("cache1"));
+            IgniteCache<Long, Person> personCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Person>("cache2"));
+
+            LOGGER.info("Running single operation read tests");
+
+            Long longVal = longCache.get(1L);
+            if (!longVal.equals(longMap.get(1L)))
+                throw new RuntimeException("Long value was incorrectly deserialized from Cassandra");
+
+            Person person = personCache.get(1L);
+            if (!person.equals(personMap.get(1L)))
+                throw new RuntimeException("Person value was incorrectly deserialized from Cassandra");
+
+            LOGGER.info("Single operation read tests passed");
+
+            LOGGER.info("Running bulk operation read tests");
+
+            Map<Long, Long> longMap1 = longCache.getAll(longMap.keySet());
+            if (!TestsHelper.checkMapsEqual(longMap, longMap1))
+                throw new RuntimeException("Long values batch was incorrectly deserialized from Cassandra");
+
+            Map<Long, Person> personMap1 = personCache.getAll(personMap.keySet());
+            if (!TestsHelper.checkPersonMapsEqual(personMap, personMap1, false))
+                throw new RuntimeException("Person values batch was incorrectly deserialized from Cassandra");
+
+            LOGGER.info("Bulk operation read tests passed");
+
+            LOGGER.info("BLOB strategy read tests passed");
+
+            LOGGER.info("Running BLOB strategy delete tests");
+
+            longCache.remove(1L);
+            longCache.removeAll(longMap.keySet());
+
+            personCache.remove(1L);
+            personCache.removeAll(personMap.keySet());
+
+            LOGGER.info("BLOB strategy delete tests passed");
+        }
+    }
+
+    /** */
+    @Test
+    public void pojoStrategyTest() {
+        Ignition.stopAll(true);
+
+        LOGGER.info("Running POJO strategy write tests");
+
+        Map<Long, Person> personMap1 = TestsHelper.generateLongsPersonsMap();
+        Map<PersonId, Person> personMap2 = TestsHelper.generatePersonIdsPersonsMap();
+
+        try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) {
+            IgniteCache<Long, Person> personCache1 = ignite.getOrCreateCache(new CacheConfiguration<Long, Person>("cache1"));
+            IgniteCache<PersonId, Person> personCache2 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache2"));
+            IgniteCache<PersonId, Person> personCache3 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache3"));
+
+            LOGGER.info("Running single operation write tests");
+            personCache1.put(1L, TestsHelper.generateRandomPerson());
+            personCache2.put(TestsHelper.generateRandomPersonId(), TestsHelper.generateRandomPerson());
+            personCache3.put(TestsHelper.generateRandomPersonId(), TestsHelper.generateRandomPerson());
+            LOGGER.info("Single operation write tests passed");
+
+            LOGGER.info("Running bulk operation write tests");
+            personCache1.putAll(personMap1);
+            personCache2.putAll(personMap2);
+            personCache3.putAll(personMap2);
+            LOGGER.info("Bulk operation write tests passed");
+        }
+
+        LOGGER.info("POJO strategy write tests passed");
+
+        Ignition.stopAll(true);
+
+        try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) {
+            LOGGER.info("Running POJO strategy read tests");
+
+            IgniteCache<Long, Person> personCache1 = ignite.getOrCreateCache(new CacheConfiguration<Long, Person>("cache1"));
+            IgniteCache<PersonId, Person> personCache2 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache2"));
+            IgniteCache<PersonId, Person> personCache3 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache3"));
+
+            LOGGER.info("Running single operation read tests");
+            Person person = personCache1.get(1L);
+            if (!person.equalsPrimitiveFields(personMap1.get(1L)))
+                throw new RuntimeException("Person value was incorrectly deserialized from Cassandra");
+
+            PersonId id = personMap2.keySet().iterator().next();
+
+            person = personCache2.get(id);
+            if (!person.equalsPrimitiveFields(personMap2.get(id)))
+                throw new RuntimeException("Person value was incorrectly deserialized from Cassandra");
+
+            person = personCache3.get(id);
+            if (!person.equals(personMap2.get(id)))
+                throw new RuntimeException("Person value was incorrectly deserialized from Cassandra");
+
+            LOGGER.info("Single operation read tests passed");
+
+            LOGGER.info("Running bulk operation read tests");
+
+            Map<Long, Person> persons1 = personCache1.getAll(personMap1.keySet());
+            if (!TestsHelper.checkPersonMapsEqual(persons1, personMap1, true))
+                throw new RuntimeException("Persons values batch was incorrectly deserialized from Cassandra");
+
+            Map<PersonId, Person> persons2 = personCache2.getAll(personMap2.keySet());
+            if (!TestsHelper.checkPersonMapsEqual(persons2, personMap2, true))
+                throw new RuntimeException("Person values batch was incorrectly deserialized from Cassandra");
+
+            Map<PersonId, Person> persons3 = personCache3.getAll(personMap2.keySet());
+            if (!TestsHelper.checkPersonMapsEqual(persons3, personMap2, false))
+                throw new RuntimeException("Person values batch was incorrectly deserialized from Cassandra");
+
+            LOGGER.info("Bulk operation read tests passed");
+
+            LOGGER.info("POJO strategy read tests passed");
+
+            LOGGER.info("Running POJO strategy delete tests");
+
+            personCache1.remove(1L);
+            personCache1.removeAll(personMap1.keySet());
+
+            personCache2.remove(id);
+            personCache2.removeAll(personMap2.keySet());
+
+            personCache3.remove(id);
+            personCache3.removeAll(personMap2.keySet());
+
+            LOGGER.info("POJO strategy delete tests passed");
+        }
+    }
+
+    /** */
+    @Test
+    public void loadCacheTest() {
+        Ignition.stopAll(true);
+
+        LOGGER.info("Running loadCache test");
+
+        LOGGER.info("Filling Cassandra table with test data");
+
+        CacheStore store = CacheStoreHelper.createCacheStore("personTypes",
+            new ClassPathResource("org/apache/ignite/tests/persistence/pojo/persistence-settings-3.xml"),
+            CassandraHelper.getAdminDataSrc());
+
+        Collection<CacheEntryImpl<PersonId, Person>> entries = TestsHelper.generatePersonIdsPersonsEntries();
+
+        store.writeAll(entries);
+
+        LOGGER.info("Cassandra table filled with test data");
+
+        LOGGER.info("Running loadCache test");
+
+        try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) {
+            IgniteCache<PersonId, Person> personCache3 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache3"));
+            int size = personCache3.size(CachePeekMode.ALL);
+
+            LOGGER.info("Initial cache size " + size);
+
+            LOGGER.info("Loading cache data from Cassandra table");
+
+            personCache3.loadCache(null, new String[] {"select * from test1.pojo_test3 limit 3"});
+
+            size = personCache3.size(CachePeekMode.ALL);
+            if (size != 3) {
+                throw new RuntimeException("Cache data was incorrectly loaded from Cassandra. " +
+                    "Expected number of records is 3, but loaded number of records is " + size);
+            }
+
+            LOGGER.info("Cache data loaded from Cassandra table");
+        }
+
+        LOGGER.info("loadCache test passed");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/load/Generator.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/load/Generator.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/Generator.java
new file mode 100644
index 0000000..0c18bc0
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/Generator.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.load;
+
+/**
+ * Generator abstraction which could be used by tests to generate next key/value pair for Ignite cache
+ * from provided int number (which sequentially incremented in load test driver loop).
+ */
+public interface Generator {
+    /** */
+    public Object generate(long i);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/load/IntGenerator.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/load/IntGenerator.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/IntGenerator.java
new file mode 100644
index 0000000..a31abee
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/IntGenerator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.load;
+
+/**
+ * Implementation of {@link org.apache.ignite.tests.load.Generator} generating {@link Integer} instance.
+ */
+public class IntGenerator implements Generator {
+    /** {@inheritDoc} */
+    @Override public Object generate(long i) {
+        long val = i / 10000;
+
+        while (val > Integer.MAX_VALUE)
+            val = val / 2;
+
+        return (int)val;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/load/LoadTestDriver.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/load/LoadTestDriver.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/LoadTestDriver.java
new file mode 100644
index 0000000..296839d
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/LoadTestDriver.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.load;
+
+import java.lang.reflect.Constructor;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.cassandra.common.SystemHelper;
+import org.apache.ignite.tests.utils.TestsHelper;
+import org.apache.log4j.Logger;
+
+/**
+ * Basic load test driver to be inherited by specific implementation for particular use-case.
+ */
+public abstract class LoadTestDriver {
+    /** Number of attempts to setup load test */
+    private static final int NUMBER_OF_SETUP_ATTEMPTS = 10;
+
+    /** Timeout between load test setup attempts */
+    private static final int SETUP_ATTEMPT_TIMEOUT = 1000;
+
+    /** */
+    public void runTest(String testName, Class<? extends Worker> clazz, String logName) {
+        logger().info("Running " + testName + " test");
+
+        Object cfg = null;
+
+        int attempt;
+
+        logger().info("Setting up load tests driver");
+
+        for (attempt = 0; attempt < NUMBER_OF_SETUP_ATTEMPTS; attempt++) {
+            try {
+                cfg = setup(logName);
+                break;
+            }
+            catch (Throwable e) {
+                logger().error((attempt + 1) + " attempt to setup load test '" + testName + "' failed", e);
+            }
+
+            if (attempt + 1 != NUMBER_OF_SETUP_ATTEMPTS) {
+                logger().info("Sleeping for " + SETUP_ATTEMPT_TIMEOUT + " seconds before trying next attempt " +
+                        "to setup '" + testName + "' load test");
+
+                try {
+                    Thread.sleep(SETUP_ATTEMPT_TIMEOUT);
+                }
+                catch (InterruptedException ignored) {
+                    // No-op.
+                }
+            }
+        }
+
+        if (cfg == null && attempt == NUMBER_OF_SETUP_ATTEMPTS) {
+            throw new RuntimeException("All " + NUMBER_OF_SETUP_ATTEMPTS + " attempts to setup load test '" +
+                    testName+ "' have failed");
+        }
+
+        // calculates host unique prefix based on its subnet IP address
+        long hostUniqePrefix = getHostUniquePrefix();
+
+        logger().info("Load tests driver setup successfully completed");
+
+        try {
+
+            List<Worker> workers = new LinkedList<>();
+            long startPosition = 0;
+
+            logger().info("Starting workers");
+
+            for (int i = 0; i < TestsHelper.getLoadTestsThreadsCount(); i++) {
+                Worker worker = createWorker(clazz, cfg,
+                    hostUniqePrefix + startPosition,
+                    hostUniqePrefix + startPosition + 100000000);
+                workers.add(worker);
+                worker.setName(testName + "-worker-" + i);
+                worker.start();
+                startPosition += 100000001;
+            }
+
+            logger().info("Workers started");
+            logger().info("Waiting for workers to complete");
+
+            List<String> failedWorkers = new LinkedList<>();
+
+            for (Worker worker : workers) {
+                boolean failed = false;
+
+                try {
+                    worker.join();
+                }
+                catch (Throwable e) {
+                    logger().error("Worker " + worker.getName() + " waiting interrupted", e);
+                    failed = true;
+                }
+
+                if (failed || worker.isFailed()) {
+                    failedWorkers.add(worker.getName());
+                    logger().info("Worker " + worker.getName() + " execution failed");
+                }
+                else
+                    logger().info("Worker " + worker.getName() + " successfully completed");
+            }
+
+            printTestResultsHeader(testName, failedWorkers);
+            printTestResultsStatistics(testName, workers);
+        }
+        finally {
+            tearDown(cfg);
+        }
+    }
+
+    /** */
+    protected abstract Logger logger();
+
+    /** */
+    protected abstract Object setup(String logName);
+
+    /** */
+    protected void tearDown(Object obj) {
+    }
+
+    /** */
+    @SuppressWarnings("unchecked")
+    private Worker createWorker(Class clazz, Object cfg, long startPosition, long endPosition) {
+        try {
+            Class cfgCls = cfg instanceof Ignite ? Ignite.class : CacheStore.class;
+
+            Constructor ctor = clazz.getConstructor(cfgCls, long.class, long.class);
+
+            return (Worker)ctor.newInstance(cfg, startPosition, endPosition);
+        }
+        catch (Throwable e) {
+            logger().error("Failed to instantiate worker of class '" + clazz.getName() + "'", e);
+            throw new RuntimeException("Failed to instantiate worker of class '" + clazz.getName() + "'", e);
+        }
+    }
+
+    /** */
+    private void printTestResultsHeader(String testName, List<String> failedWorkers) {
+        if (failedWorkers.isEmpty()) {
+            logger().info(testName + " test execution successfully completed.");
+            return;
+        }
+
+        if (failedWorkers.size() == TestsHelper.getLoadTestsThreadsCount()) {
+            logger().error(testName + " test execution totally failed.");
+            return;
+        }
+
+        String strFailedWorkers = "";
+
+        for (String workerName : failedWorkers) {
+            if (!strFailedWorkers.isEmpty())
+                strFailedWorkers += ", ";
+
+            strFailedWorkers += workerName;
+        }
+
+        logger().warn(testName + " test execution completed, but " + failedWorkers.size() + " of " +
+            TestsHelper.getLoadTestsThreadsCount() + " workers failed. Failed workers: " + strFailedWorkers);
+    }
+
+    /** */
+    @SuppressWarnings("StringBufferReplaceableByString")
+    private void printTestResultsStatistics(String testName, List<Worker> workers) {
+        long cnt = 0;
+        long errCnt = 0;
+        long speed = 0;
+
+        for (Worker worker : workers) {
+            cnt += worker.getMsgProcessed();
+            errCnt += worker.getErrorsCount();
+            speed += worker.getSpeed();
+        }
+
+        float errPercent = errCnt == 0 ?
+            0 :
+            cnt + errCnt ==  0 ? 0 : (float)(errCnt * 100 ) / (float)(cnt + errCnt);
+
+        StringBuilder builder = new StringBuilder();
+        builder.append(SystemHelper.LINE_SEPARATOR);
+        builder.append("-------------------------------------------------");
+        builder.append(SystemHelper.LINE_SEPARATOR);
+        builder.append(testName).append(" test statistics").append(SystemHelper.LINE_SEPARATOR);
+        builder.append(testName).append(" messages: ").append(cnt).append(SystemHelper.LINE_SEPARATOR);
+        builder.append(testName).append(" errors: ").append(errCnt).append(", ").
+                append(String.format("%.2f", errPercent).replace(",", ".")).
+                append("%").append(SystemHelper.LINE_SEPARATOR);
+        builder.append(testName).append(" speed: ").append(speed).append(" msg/sec").append(SystemHelper.LINE_SEPARATOR);
+        builder.append("-------------------------------------------------");
+
+        logger().info(builder.toString());
+    }
+
+    /** */
+    private long getHostUniquePrefix() {
+        String[] parts = SystemHelper.HOST_IP.split("\\.");
+
+        if (parts[2].equals("0"))
+            parts[2] = "777";
+
+        if (parts[3].equals("0"))
+            parts[3] = "777";
+
+        long part3 = Long.parseLong(parts[2]);
+        long part4 = Long.parseLong(parts[3]);
+
+        if (part3 < 10)
+            part3 = part3 * 100;
+        else if (part4 < 100)
+            part3 = part3 * 10;
+
+        if (part4 < 10)
+            part4 = part4 * 100;
+        else if (part4 < 100)
+            part4 = part4 * 10;
+
+        return (part4 * 100000000000000L) + (part3 * 100000000000L) + Thread.currentThread().getId();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/load/LongGenerator.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/load/LongGenerator.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/LongGenerator.java
new file mode 100644
index 0000000..0398f98
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/LongGenerator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.load;
+
+/**
+ * Implementation of {@link org.apache.ignite.tests.load.Generator} generating {@link Long} instance.
+ */
+public class LongGenerator implements Generator {
+    /** {@inheritDoc} */
+    @Override public Object generate(long i) {
+        return i;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/load/PersonGenerator.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/load/PersonGenerator.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/PersonGenerator.java
new file mode 100644
index 0000000..0317320
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/PersonGenerator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.load;
+
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.ignite.tests.pojos.Person;
+
+/**
+ * Implementation of {@link Generator} generating {@link Person} instance.
+ */
+public class PersonGenerator implements Generator {
+    /** */
+    private static final Date DATE = new Date();
+
+    /** */
+    private static final List<String> PHONES = new LinkedList<String>(){{
+        add("1234567");
+        add("7654321");
+        add("1289054");
+    }};
+
+    /** {@inheritDoc} */
+    @Override public Object generate(long i) {
+        return new Person(Long.toString(i), Long.toString(i), (int)(i % 100), i % 2 == 0, i, i, DATE, PHONES);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/load/PersonIdGenerator.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/load/PersonIdGenerator.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/PersonIdGenerator.java
new file mode 100644
index 0000000..a11e0d8
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/PersonIdGenerator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.load;
+
+import org.apache.ignite.tests.pojos.PersonId;
+
+/**
+ * Implementation of {@link org.apache.ignite.tests.load.Generator} generating
+ * {@link org.apache.ignite.tests.pojos.PersonId} instance.
+ */
+public class PersonIdGenerator implements Generator {
+    /** {@inheritDoc} */
+    @Override public Object generate(long i) {
+        return new PersonId(Long.toString(i), Long.toString(i), i);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/load/StringGenerator.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/load/StringGenerator.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/StringGenerator.java
new file mode 100644
index 0000000..cfaf34a
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/StringGenerator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.load;
+
+/**
+ * Implementation of {@link org.apache.ignite.tests.load.Generator} generating {@link String} instance.
+ */
+public class StringGenerator implements Generator {
+    /** {@inheritDoc} */
+    @Override public Object generate(long i) {
+        return Long.toString(i);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/load/Worker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/load/Worker.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/Worker.java
new file mode 100644
index 0000000..f4bffc7
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/Worker.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.load;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.cassandra.common.SystemHelper;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.tests.utils.TestsHelper;
+import org.apache.log4j.Logger;
+
+/**
+ * Worker thread abstraction to be inherited by specific load test implementation
+ */
+public abstract class Worker extends Thread {
+    /** */
+    private static final SimpleDateFormat TIME_FORMATTER = new SimpleDateFormat("hh:mm:ss");
+
+    /** */
+    private long testStartTime;
+
+    /** */
+    boolean warmup = TestsHelper.getLoadTestsWarmupPeriod() != 0;
+
+    /** */
+    private volatile long warmupStartTime = 0;
+
+    /** */
+    private volatile long warmupFinishTime = 0;
+
+    /** */
+    private volatile long startTime = 0;
+
+    /** */
+    private volatile long finishTime = 0;
+
+    /** */
+    private volatile long warmupMsgProcessed = 0;
+
+    /** */
+    private volatile long warmupSleepCnt = 0;
+
+    /** */
+    private volatile long msgProcessed = 0;
+
+    /** */
+    private volatile long msgFailed = 0;
+
+    /** */
+    private volatile long sleepCnt = 0;
+
+    /** */
+    private Throwable executionError;
+
+    /** */
+    private long statReportedTime;
+
+    /** */
+    private CacheStore cacheStore;
+
+    /** */
+    private Ignite ignite;
+
+    /** */
+    private IgniteCache igniteCache;
+
+    /** */
+    private Logger log;
+
+    /** */
+    private long startPosition;
+
+    /** */
+    private long endPosition;
+
+    /** */
+    public Worker(CacheStore cacheStore, long startPosition, long endPosition) {
+        this.cacheStore = cacheStore;
+        this.log = Logger.getLogger(loggerName());
+        this.startPosition = startPosition;
+        this.endPosition = endPosition;
+    }
+
+    /** */
+    public Worker(Ignite ignite, long startPosition, long endPosition) {
+        this.ignite = ignite;
+        this.log = Logger.getLogger(loggerName());
+        this.startPosition = startPosition;
+        this.endPosition = endPosition;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void run() {
+        try {
+            if (ignite != null)
+                igniteCache = ignite.getOrCreateCache(new CacheConfiguration(TestsHelper.getLoadTestsCacheName()));
+
+            execute();
+        }
+        catch (Throwable e) {
+            executionError = e;
+            throw new RuntimeException("Test execution abnormally terminated", e);
+        }
+        finally {
+            reportTestCompletion();
+        }
+    }
+
+    /** */
+    public boolean isFailed() {
+        return executionError != null;
+    }
+
+    /** */
+    public long getSpeed() {
+        if (msgProcessed == 0)
+            return 0;
+
+        long finish = finishTime != 0 ? finishTime : System.currentTimeMillis();
+        long duration = (finish - startTime - sleepCnt * TestsHelper.getLoadTestsRequestsLatency()) / 1000;
+
+        return duration == 0 ? msgProcessed : msgProcessed / duration;
+    }
+
+    /** */
+    public long getErrorsCount() {
+        return msgFailed;
+    }
+
+    /** */
+    public float getErrorsPercent() {
+        if (msgFailed == 0)
+            return 0;
+
+        return msgProcessed + msgFailed == 0 ? 0 : (float)(msgFailed * 100 ) / (float)(msgProcessed + msgFailed);
+    }
+
+    /** */
+    public long getMsgCountTotal() {
+        return warmupMsgProcessed + msgProcessed;
+    }
+
+    /** */
+    public long getWarmupMsgProcessed() {
+        return warmupMsgProcessed;
+    }
+
+    /** */
+    public long getMsgProcessed() {
+        return msgProcessed;
+    }
+
+    /** */
+    protected abstract String loggerName();
+
+    /** */
+    protected abstract boolean batchMode();
+
+    /** */
+    protected void process(CacheStore cacheStore, CacheEntryImpl entry) {
+        throw new UnsupportedOperationException("Single message processing is not supported");
+    }
+
+    /** */
+    protected void process(IgniteCache cache, Object key, Object val) {
+        throw new UnsupportedOperationException("Single message processing is not supported");
+    }
+
+    /** */
+    protected void process(CacheStore cacheStore, Collection<CacheEntryImpl> entries) {
+        throw new UnsupportedOperationException("Batch processing is not supported");
+    }
+
+    /** */
+    protected void process(IgniteCache cache, Map map) {
+        throw new UnsupportedOperationException("Batch processing is not supported");
+    }
+
+    /** */
+    @SuppressWarnings("unchecked")
+    private void execute() throws InterruptedException {
+        testStartTime = System.currentTimeMillis();
+
+        log.info("Test execution started");
+
+        if (warmup)
+            log.info("Warm up period started");
+
+        warmupStartTime = warmup ? testStartTime : 0;
+        startTime = !warmup ? testStartTime : 0;
+
+        statReportedTime = testStartTime;
+
+        long cntr = startPosition;
+        Object key = TestsHelper.generateLoadTestsKey(cntr);
+        Object val = TestsHelper.generateLoadTestsValue(cntr);
+        List<CacheEntryImpl> batchList = new ArrayList<>(TestsHelper.getBulkOperationSize());
+        Map batchMap = new HashMap(TestsHelper.getBulkOperationSize());
+
+        int execTime = TestsHelper.getLoadTestsWarmupPeriod() + TestsHelper.getLoadTestsExecutionTime();
+
+        try {
+            while (true) {
+                if (System.currentTimeMillis() - testStartTime > execTime)
+                    break;
+
+                if (warmup && System.currentTimeMillis() - testStartTime > TestsHelper.getLoadTestsWarmupPeriod()) {
+                    warmupFinishTime = System.currentTimeMillis();
+                    startTime = warmupFinishTime;
+                    statReportedTime = warmupFinishTime;
+                    warmup = false;
+                    log.info("Warm up period completed");
+                }
+
+                if (!batchMode()) {
+                    if (cacheStore != null)
+                        doWork(new CacheEntryImpl(key, val));
+                    else
+                        doWork(key, val);
+                }
+                else if (batchList.size() == TestsHelper.getBulkOperationSize() ||
+                    batchMap.size() == TestsHelper.getBulkOperationSize()) {
+                    if (cacheStore != null)
+                        doWork(batchList);
+                    else
+                        doWork(batchMap);
+
+                    batchMap.clear();
+                    batchList.clear();
+                }
+
+                if (cntr == endPosition)
+                    cntr = startPosition;
+                else
+                    cntr++;
+
+                key = TestsHelper.generateLoadTestsKey(cntr);
+                val = TestsHelper.generateLoadTestsValue(cntr);
+
+                if (batchMode()) {
+                    if (cacheStore != null)
+                        batchList.add(new CacheEntryImpl(key, val));
+                    else
+                        batchMap.put(key, val);
+                }
+
+                reportStatistics();
+            }
+        }
+        finally {
+            warmupFinishTime = warmupFinishTime != 0 ? warmupFinishTime : System.currentTimeMillis();
+            finishTime = System.currentTimeMillis();
+        }
+    }
+
+    /** */
+    private void doWork(CacheEntryImpl entry) {
+        try {
+            process(cacheStore, entry);
+            updateMetrics(1);
+        }
+        catch (Throwable e) {
+            log.error("Failed to perform single operation", e);
+            updateErrorMetrics(1);
+        }
+    }
+
+    /** */
+    private void doWork(Object key, Object val) {
+        try {
+            process(igniteCache, key, val);
+            updateMetrics(1);
+        }
+        catch (Throwable e) {
+            log.error("Failed to perform single operation", e);
+            updateErrorMetrics(1);
+        }
+    }
+
+    /** */
+    private void doWork(Collection<CacheEntryImpl> entries) {
+        try {
+            process(cacheStore, entries);
+            updateMetrics(entries.size());
+        }
+        catch (Throwable e) {
+            log.error("Failed to perform batch operation", e);
+            updateErrorMetrics(entries.size());
+        }
+    }
+
+    /** */
+    private void doWork(Map entries) {
+        try {
+            process(igniteCache, entries);
+            updateMetrics(entries.size());
+        }
+        catch (Throwable e) {
+            log.error("Failed to perform batch operation", e);
+            updateErrorMetrics(entries.size());
+        }
+    }
+
+    /** */
+    private long getWarmUpSpeed() {
+        if (warmupMsgProcessed == 0)
+            return 0;
+
+        long finish = warmupFinishTime != 0 ? warmupFinishTime : System.currentTimeMillis();
+        long duration = (finish - warmupStartTime - warmupSleepCnt * TestsHelper.getLoadTestsRequestsLatency()) / 1000;
+
+        return duration == 0 ? warmupMsgProcessed : warmupMsgProcessed / duration;
+    }
+
+    /** */
+    private void updateMetrics(int itemsProcessed) {
+        if (warmup)
+            warmupMsgProcessed += itemsProcessed;
+        else
+            msgProcessed += itemsProcessed;
+
+        if (TestsHelper.getLoadTestsRequestsLatency() > 0) {
+            try {
+                Thread.sleep(TestsHelper.getLoadTestsRequestsLatency());
+
+                if (warmup)
+                    warmupSleepCnt++;
+                else
+                    sleepCnt++;
+            }
+            catch (Throwable ignored) {
+            }
+        }
+    }
+
+    /**
+     * TODO IGNITE-1371 Comment absent.
+     *
+     * @param itemsFailed Failed item.
+     */
+    private void updateErrorMetrics(int itemsFailed) {
+        if (!warmup)
+            msgFailed += itemsFailed;
+    }
+
+    /** */
+    private void reportStatistics() {
+        // statistics should be reported only every 30 seconds
+        if (System.currentTimeMillis() - statReportedTime < 30000)
+            return;
+
+        statReportedTime = System.currentTimeMillis();
+
+        int completed = warmup ?
+                (int)(statReportedTime - warmupStartTime) * 100 / TestsHelper.getLoadTestsWarmupPeriod() :
+                (int)(statReportedTime - startTime) * 100 / TestsHelper.getLoadTestsExecutionTime();
+
+        if (completed > 100)
+            completed = 100;
+
+        if (warmup) {
+            log.info("Warm up messages processed " + warmupMsgProcessed + ", " +
+                "speed " + getWarmUpSpeed() + " msg/sec, " + completed + "% completed");
+        }
+        else {
+            log.info("Messages processed " + msgProcessed + ", " +
+                "speed " + getSpeed() + " msg/sec, " + completed + "% completed, " +
+                "errors " + msgFailed + " / " + String.format("%.2f", getErrorsPercent()).replace(",", ".") + "%");
+        }
+    }
+
+    /** */
+    private void reportTestCompletion() {
+        StringBuilder builder = new StringBuilder();
+
+        if (executionError != null)
+            builder.append("Test execution abnormally terminated. ");
+        else
+            builder.append("Test execution successfully completed. ");
+
+        builder.append("Statistics: ").append(SystemHelper.LINE_SEPARATOR);
+        builder.append("Start time: ").append(TIME_FORMATTER.format(testStartTime)).append(SystemHelper.LINE_SEPARATOR);
+        builder.append("Finish time: ").append(TIME_FORMATTER.format(finishTime)).append(SystemHelper.LINE_SEPARATOR);
+        builder.append("Duration: ").append((finishTime - testStartTime) / 1000).append(" sec")
+            .append(SystemHelper.LINE_SEPARATOR);
+
+        if (TestsHelper.getLoadTestsWarmupPeriod() > 0) {
+            builder.append("Warm up period: ").append(TestsHelper.getLoadTestsWarmupPeriod() / 1000)
+                .append(" sec").append(SystemHelper.LINE_SEPARATOR);
+            builder.append("Warm up processed messages: ").append(warmupMsgProcessed).append(SystemHelper.LINE_SEPARATOR);
+            builder.append("Warm up processing speed: ").append(getWarmUpSpeed())
+                .append(" msg/sec").append(SystemHelper.LINE_SEPARATOR);
+        }
+
+        builder.append("Processed messages: ").append(msgProcessed).append(SystemHelper.LINE_SEPARATOR);
+        builder.append("Processing speed: ").append(getSpeed()).append(" msg/sec").append(SystemHelper.LINE_SEPARATOR);
+        builder.append("Errors: ").append(msgFailed).append(" / ").
+                append(String.format("%.2f", getErrorsPercent()).replace(",", ".")).append("%");
+
+        if (executionError != null)
+            log.error(builder.toString(), executionError);
+        else
+            log.info(builder.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/load/cassandra/BulkReadWorker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/load/cassandra/BulkReadWorker.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/cassandra/BulkReadWorker.java
new file mode 100644
index 0000000..38f0db8
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/cassandra/BulkReadWorker.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.load.cassandra;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.tests.load.Worker;
+import org.apache.ignite.tests.utils.TestsHelper;
+
+/**
+ * Cassandra direct load tests worker for bulk read operation CacheStore.load
+ */
+public class BulkReadWorker extends Worker {
+    /** */
+    public static final String LOGGER_NAME = "CassandraBulkReadLoadTest";
+
+    /** */
+    private List<Object> keys = new ArrayList<>(TestsHelper.getBulkOperationSize());
+
+    /** */
+    public BulkReadWorker(CacheStore cacheStore, long startPosition, long endPosition) {
+        super(cacheStore, startPosition, endPosition);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String loggerName() {
+        return LOGGER_NAME;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean batchMode() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void process(CacheStore cacheStore, Collection<CacheEntryImpl> entries) {
+        keys.clear();
+
+        for (CacheEntryImpl entry : entries)
+            keys.add(entry.getKey());
+
+        cacheStore.loadAll(keys);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/load/cassandra/BulkWriteWorker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/load/cassandra/BulkWriteWorker.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/cassandra/BulkWriteWorker.java
new file mode 100644
index 0000000..c71728f
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/cassandra/BulkWriteWorker.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.load.cassandra;
+
+import java.util.Collection;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.tests.load.Worker;
+
+/**
+ * Cassandra direct load tests worker for bulk write operation CacheStore.writeAll
+ */
+public class BulkWriteWorker extends Worker {
+    /** */
+    public static final String LOGGER_NAME = "CassandraBulkWriteLoadTest";
+
+    /** */
+    public BulkWriteWorker(CacheStore cacheStore, long startPosition, long endPosition) {
+        super(cacheStore, startPosition, endPosition);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String loggerName() {
+        return LOGGER_NAME;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean batchMode() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void process(CacheStore cacheStore, Collection<CacheEntryImpl> entries) {
+        cacheStore.writeAll(entries);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/load/cassandra/ReadWorker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/load/cassandra/ReadWorker.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/cassandra/ReadWorker.java
new file mode 100644
index 0000000..051b55f
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/cassandra/ReadWorker.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.load.cassandra;
+
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.tests.load.Worker;
+
+/**
+ * Cassandra direct load tests worker for read operation CacheStore.load
+ */
+public class ReadWorker extends Worker {
+    /** */
+    public static final String LOGGER_NAME = "CassandraReadLoadTest";
+
+    /** */
+    public ReadWorker(CacheStore cacheStore, long startPosition, long endPosition) {
+        super(cacheStore, startPosition, endPosition);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String loggerName() {
+        return LOGGER_NAME;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean batchMode() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void process(CacheStore cacheStore, CacheEntryImpl entry) {
+        cacheStore.load(entry.getKey());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/load/cassandra/WriteWorker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/load/cassandra/WriteWorker.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/cassandra/WriteWorker.java
new file mode 100644
index 0000000..2b10bcd
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/cassandra/WriteWorker.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.load.cassandra;
+
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.tests.load.Worker;
+
+/**
+ * Cassandra direct load tests worker for write operation CacheStore.write
+ */
+public class WriteWorker extends Worker {
+    /** */
+    public static final String LOGGER_NAME = "CassandraWriteLoadTest";
+
+    /** */
+    public WriteWorker(CacheStore cacheStore, long startPosition, long endPosition) {
+        super(cacheStore, startPosition, endPosition);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String loggerName() {
+        return LOGGER_NAME;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean batchMode() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void process(CacheStore cacheStore, CacheEntryImpl entry) {
+        cacheStore.write(entry);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/load/ignite/BulkReadWorker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/load/ignite/BulkReadWorker.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/ignite/BulkReadWorker.java
new file mode 100644
index 0000000..c20d0ce
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/ignite/BulkReadWorker.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.load.ignite;
+
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.tests.load.Worker;
+
+/**
+ * Ignite load tests worker for bulk read operation CacheStore.loadAll
+ */
+public class BulkReadWorker extends Worker {
+    /** */
+    public static final String LOGGER_NAME = "IgniteBulkReadLoadTest";
+
+    /** */
+    public BulkReadWorker(Ignite ignite, long startPosition, long endPosition) {
+        super(ignite, startPosition, endPosition);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String loggerName() {
+        return LOGGER_NAME;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean batchMode() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void process(IgniteCache cache, Map entries) {
+        cache.getAll(entries.keySet());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/load/ignite/BulkWriteWorker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/load/ignite/BulkWriteWorker.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/ignite/BulkWriteWorker.java
new file mode 100644
index 0000000..1ce7be3
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/ignite/BulkWriteWorker.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.load.ignite;
+
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.tests.load.Worker;
+
+/**
+ * Ignite load tests worker for bulk read operation CacheStore.writeAll
+ */
+public class BulkWriteWorker extends Worker {
+    /** */
+    public static final String LOGGER_NAME = "IgniteBulkWriteLoadTest";
+
+    /** */
+    public BulkWriteWorker(Ignite ignite, long startPosition, long endPosition) {
+        super(ignite, startPosition, endPosition);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String loggerName() {
+        return LOGGER_NAME;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean batchMode() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void process(IgniteCache cache, Map entries) {
+        cache.putAll(entries);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/load/ignite/ReadWorker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/load/ignite/ReadWorker.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/ignite/ReadWorker.java
new file mode 100644
index 0000000..35f7d39
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/ignite/ReadWorker.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.load.ignite;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.tests.load.Worker;
+
+/**
+ * Ignite load tests worker for read operation CacheStore.load
+ */
+public class ReadWorker extends Worker {
+    /** */
+    public static final String LOGGER_NAME = "IgniteReadLoadTest";
+
+    /** */
+    public ReadWorker(Ignite ignite, long startPosition, long endPosition) {
+        super(ignite, startPosition, endPosition);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String loggerName() {
+        return LOGGER_NAME;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean batchMode() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void process(IgniteCache cache, Object key, Object val) {
+        cache.get(key);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/load/ignite/WriteWorker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/load/ignite/WriteWorker.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/ignite/WriteWorker.java
new file mode 100644
index 0000000..bed7099
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/load/ignite/WriteWorker.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.load.ignite;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.tests.load.Worker;
+
+/**
+ * Ignite load tests worker for write operation CacheStore.write
+ */
+public class WriteWorker extends Worker {
+    /** */
+    public static final String LOGGER_NAME = "IgniteWriteLoadTest";
+
+    /** */
+    public WriteWorker(Ignite ignite, long startPosition, long endPosition) {
+        super(ignite, startPosition, endPosition);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String loggerName() {
+        return LOGGER_NAME;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean batchMode() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void process(IgniteCache cache, Object key, Object val) {
+        cache.put(key, val);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/pojos/Person.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/pojos/Person.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/pojos/Person.java
new file mode 100644
index 0000000..8a1e623
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/pojos/Person.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.pojos;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Simple POJO which could be stored as a value in Ignite cache
+ */
+public class Person implements Externalizable {
+    /** */
+    private String firstName;
+
+    /** */
+    private String lastName;
+
+    /** */
+    private int age;
+
+    /** */
+    private boolean married;
+
+    /** */
+    private long height;
+
+    /** */
+    private float weight;
+
+    /** */
+    private Date birthDate;
+
+    /** */
+    private List<String> phones;
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public Person() {
+    }
+
+    /** */
+    public Person(String firstName, String lastName, int age, boolean married,
+        long height, float weight, Date birthDate, List<String> phones) {
+        this.firstName = firstName;
+        this.lastName = lastName;
+        this.age = age;
+        this.married = married;
+        this.height = height;
+        this.weight = weight;
+        this.birthDate = birthDate;
+        this.phones = phones;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(firstName);
+        out.writeObject(lastName);
+        out.writeInt(age);
+        out.writeBoolean(married);
+        out.writeLong(height);
+        out.writeFloat(weight);
+        out.writeObject(birthDate);
+        out.writeObject(phones);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        firstName = (String)in.readObject();
+        lastName = (String)in.readObject();
+        age = in.readInt();
+        married = in.readBoolean();
+        height = in.readLong();
+        weight = in.readFloat();
+        birthDate = (Date)in.readObject();
+        phones = (List<String>)in.readObject();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("SimplifiableIfStatement")
+    @Override public boolean equals(Object obj) {
+        if (obj == null || !(obj instanceof Person))
+            return false;
+
+        Person person = (Person)obj;
+
+        if ((firstName != null && !firstName.equals(person.firstName)) ||
+            (person.firstName != null && !person.firstName.equals(firstName)))
+            return false;
+
+        if ((lastName != null && !lastName.equals(person.lastName)) ||
+            (person.lastName != null && !person.lastName.equals(lastName)))
+            return false;
+
+        if ((birthDate != null && !birthDate.equals(person.birthDate)) ||
+            (person.birthDate != null && !person.birthDate.equals(birthDate)))
+            return false;
+
+        if ((phones != null && !phones.equals(person.phones)) ||
+            (person.phones != null && !person.phones.equals(phones)))
+            return false;
+
+        return age == person.age && married == person.married &&
+            height == person.height && weight == person.weight;
+    }
+
+    /** */
+    @SuppressWarnings("SimplifiableIfStatement")
+    public boolean equalsPrimitiveFields(Object obj) {
+        if (obj == null || !(obj instanceof Person))
+            return false;
+
+        Person person = (Person)obj;
+
+        if ((firstName != null && !firstName.equals(person.firstName)) ||
+            (person.firstName != null && !person.firstName.equals(firstName)))
+            return false;
+
+        if ((lastName != null && !lastName.equals(person.lastName)) ||
+            (person.lastName != null && !person.lastName.equals(lastName)))
+            return false;
+
+        if ((birthDate != null && !birthDate.equals(person.birthDate)) ||
+            (person.birthDate != null && !person.birthDate.equals(birthDate)))
+            return false;
+
+        return age == person.age && married == person.married &&
+            height == person.height && weight == person.weight;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setFirstName(String name) {
+        firstName = name;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public String getFirstName() {
+        return firstName;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setLastName(String name) {
+        lastName = name;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public String getLastName() {
+        return lastName;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setAge(int age) {
+        this.age = age;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public int getAge() {
+        return age;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setMarried(boolean married) {
+        this.married = married;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public boolean getMarried() {
+        return married;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setHeight(long height) {
+        this.height = height;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public long getHeight() {
+        return height;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setWeight(float weight) {
+        this.weight = weight;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public float getWeight() {
+        return weight;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setBirthDate(Date date) {
+        birthDate = date;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public Date getBirthDate() {
+        return birthDate;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setPhones(List<String> phones) {
+        this.phones = phones;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public List<String> getPhones() {
+        return phones;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/pojos/PersonId.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/pojos/PersonId.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/pojos/PersonId.java
new file mode 100644
index 0000000..0dd5ab8
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/pojos/PersonId.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.pojos;
+
+import java.io.Serializable;
+
+/**
+ * Simple POJO which could be stored as a key in Ignite cache
+ */
+public class PersonId implements Serializable {
+    /** */
+    private String companyCode;
+
+    /** */
+    private String departmentCode;
+
+    /** */
+    private long personNum;
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public PersonId() {
+    }
+
+    /** */
+    public PersonId(String companyCode, String departmentCode, long personNum) {
+        this.companyCode = companyCode;
+        this.departmentCode = departmentCode;
+        this.personNum = personNum;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("SimplifiableIfStatement")
+    @Override public boolean equals(Object obj) {
+        if (obj == null || !(obj instanceof PersonId))
+            return false;
+
+        PersonId id = (PersonId)obj;
+
+        if ((companyCode != null && !companyCode.equals(id.companyCode)) ||
+            (id.companyCode != null && !id.companyCode.equals(companyCode)))
+            return false;
+
+        if ((companyCode != null && !companyCode.equals(id.companyCode)) ||
+            (id.companyCode != null && !id.companyCode.equals(companyCode)))
+            return false;
+
+        return personNum == id.personNum;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        String code = (companyCode == null ? "" : companyCode) +
+            (departmentCode == null ? "" : departmentCode) +
+                personNum;
+
+        return code.hashCode();
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setCompanyCode(String code) {
+        companyCode = code;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public String getCompanyCode() {
+        return companyCode;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setDepartmentCode(String code) {
+        departmentCode = code;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public String getDepartmentCode() {
+        return departmentCode;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public void setPersonNumber(long personNum) {
+        this.personNum = personNum;
+    }
+
+    /** */
+    @SuppressWarnings("UnusedDeclaration")
+    public long getPersonNumber() {
+        return personNum;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CacheStoreHelper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CacheStoreHelper.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CacheStoreHelper.java
new file mode 100644
index 0000000..b5ff5ad
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CacheStoreHelper.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.utils;
+
+import java.lang.reflect.Field;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.cassandra.CassandraCacheStore;
+import org.apache.ignite.cache.store.cassandra.datasource.DataSource;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+import org.apache.ignite.logger.log4j.Log4JLogger;
+import org.apache.log4j.Logger;
+import org.springframework.core.io.Resource;
+
+/**
+ * Helper class utilized by unit tests to get appropriate instance of {@link CacheStore}
+ */
+public class CacheStoreHelper {
+    /** */
+    private static final Logger LOGGER = Logger.getLogger(CacheStoreHelper.class.getName());
+
+    /** */
+    public static CacheStore createCacheStore(String cacheName, Resource persistenceSettings, DataSource conn) {
+        return createCacheStore(cacheName, persistenceSettings, conn, LOGGER);
+    }
+
+    /** */
+    public static CacheStore createCacheStore(String cacheName, Resource persistenceSettings, DataSource conn,
+        Logger log) {
+        CassandraCacheStore<Integer, Integer> cacheStore =
+            new CassandraCacheStore<>(conn, new KeyValuePersistenceSettings(persistenceSettings),
+                Runtime.getRuntime().availableProcessors());
+
+        try {
+            Field sesField = CassandraCacheStore.class.getDeclaredField("storeSes");
+            Field logField = CassandraCacheStore.class.getDeclaredField("log");
+
+            sesField.setAccessible(true);
+            logField.setAccessible(true);
+
+            sesField.set(cacheStore, new TestCacheSession(cacheName));
+            logField.set(cacheStore, new Log4JLogger(log));
+        }
+        catch (Throwable e) {
+            throw new RuntimeException("Failed to initialize test Ignite cache store", e);
+        }
+
+        return cacheStore;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java
new file mode 100644
index 0000000..66df6e7
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.utils;
+
+import org.apache.ignite.cache.store.cassandra.datasource.Credentials;
+
+/**
+ * Implementation of {@link org.apache.ignite.cache.store.cassandra.datasource.Credentials}
+ * providing admin user/password to establish Cassandra session.
+ */
+public class CassandraAdminCredentials implements Credentials {
+    /** {@inheritDoc} */
+    @Override public String getUser() {
+        return CassandraHelper.getAdminUser();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getPassword() {
+        return CassandraHelper.getAdminPassword();
+    }
+}


Mime
View raw message