ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptupit...@apache.org
Subject [17/35] ignite git commit: IGNITE-3172 Refactoring Ignite-Cassandra serializers. - Fixes #956.
Date Wed, 14 Sep 2016 10:53:22 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/package-info.java
new file mode 100644
index 0000000..4edd759
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains serializers implementation, to store BLOBs into Cassandra
+ */
+package org.apache.ignite.cache.store.cassandra.serializer;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java
new file mode 100644
index 0000000..e43db1d
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+
+/**
+ * Provides information for batch operations (loadAll, deleteAll, writeAll) of Ignite cache
+ * backed by {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}.
+ *
+ * @param <R> type of the result returned from batch operation.
+ * @param <V> type of the value used in batch operation.
+ */
+public interface BatchExecutionAssistant<R, V> {
+    /**
+     * Indicates if Cassandra tables existence is required for this batch operation.
+     *
+     * @return {@code true} true if table existence required.
+     */
+    public boolean tableExistenceRequired();
+
+    /**
+     * Returns unbind CLQ statement for to be executed inside batch operation.
+     *
+     * @return Unbind CQL statement.
+     */
+    public String getStatement();
+
+    /**
+     * Binds prepared statement to current Cassandra session.
+     *
+     * @param statement Statement.
+     * @param obj Parameters for statement binding.
+     * @return Bounded statement.
+     */
+    public BoundStatement bindStatement(PreparedStatement statement, V obj);
+
+    /**
+     *  Returns Ignite cache key/value persistence settings.
+     *
+     * @return persistence settings.
+     */
+    public KeyValuePersistenceSettings getPersistenceSettings();
+
+    /**
+     * Display name for the batch operation.
+     *
+     * @return Operation display name.
+     */
+    public String operationName();
+
+    /**
+     * Processes particular row inside batch operation.
+     *
+     * @param row Row to process.
+     * @param seqNum Sequential number of the row.
+     */
+    public void process(Row row, int seqNum);
+
+    /**
+     * Checks if row/object with specified sequential number is already processed.
+     *
+     * @param seqNum object sequential number
+     * @return {@code true} if object is already processed
+     */
+    public boolean alreadyProcessed(int seqNum);
+
+    /**
+     * @return number of processed objects/rows.
+     */
+    public int processedCount();
+
+    /**
+     * @return batch operation result.
+     */
+    public R processedData();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java
new file mode 100644
index 0000000..387c98f
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+
+/**
+ * Provides information for loadCache operation of {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}.
+ */
+public interface BatchLoaderAssistant {
+    /**
+     * Returns name of the batch load operation.
+     *
+     * @return operation name.
+     */
+    public String operationName();
+
+    /**
+     * Returns CQL statement to use in batch load operation.
+     *
+     * @return CQL statement for batch load operation.
+     */
+    public Statement getStatement();
+
+    /**
+     * Processes each row returned by batch load operation.
+     *
+     * @param row row selected from Cassandra table.
+     */
+    public void process(Row row);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
new file mode 100644
index 0000000..506982f
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import java.io.Closeable;
+
+/**
+ * Wrapper around Cassandra driver session, to automatically handle:
+ * <ul>
+ *  <li>Keyspace and table absence exceptions</li>
+ *  <li>Timeout exceptions</li>
+ *  <li>Batch operations</li>
+ * </ul>
+ */
+public interface CassandraSession extends Closeable {
+    /**
+     * Execute single synchronous operation against Cassandra  database.
+     *
+     * @param assistant execution assistance to perform the main operation logic.
+     * @param <V> type of the result returned from operation.
+     *
+     * @return result of the operation.
+     */
+    public <V> V execute(ExecutionAssistant<V> assistant);
+
+    /**
+     * Executes batch asynchronous operation against Cassandra database.
+     *
+     * @param assistant execution assistance to perform the main operation logic.
+     * @param data data which should be processed in batch operation.
+     * @param <R> type of the result returned from batch operation.
+     * @param <V> type of the value used in batch operation.
+     *
+     * @return result of the operation.
+     */
+    public <R, V> R execute(BatchExecutionAssistant<R, V> assistant, Iterable<? extends V> data);
+
+    /**
+     * Executes batch asynchronous operation to load bunch of records
+     * specified by CQL statement from Cassandra database
+     *
+     * @param assistant execution assistance to perform the main operation logic.
+     */
+    public void execute(BatchLoaderAssistant assistant);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
new file mode 100644
index 0000000..95b8581
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
@@ -0,0 +1,832 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.datastax.driver.core.querybuilder.Batch;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
+import org.apache.ignite.cache.store.cassandra.common.RandomSleeper;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+import org.apache.ignite.cache.store.cassandra.session.pool.SessionPool;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+
+/**
+ * Implementation for {@link org.apache.ignite.cache.store.cassandra.session.CassandraSession}.
+ */
+public class CassandraSessionImpl implements CassandraSession {
+    /** Number of CQL query execution attempts. */
+    private static final int CQL_EXECUTION_ATTEMPTS_COUNT = 20;
+
+    /** Min timeout between CQL query execution attempts. */
+    private static final int CQL_EXECUTION_ATTEMPT_MIN_TIMEOUT = 100;
+
+    /** Max timeout between CQL query execution attempts. */
+    private static final int CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT = 500;
+
+    /** Timeout increment for CQL query execution attempts. */
+    private static final int CQL_ATTEMPTS_TIMEOUT_INCREMENT = 100;
+
+    /** Cassandra cluster builder. */
+    private volatile Cluster.Builder builder;
+
+    /** Cassandra driver session. */
+    private volatile Session ses;
+
+    /** Number of references to Cassandra driver session (for multithreaded environment). */
+    private volatile int refCnt = 0;
+
+    /** Storage for the session prepared statements */
+    private static final Map<String, PreparedStatement> sesStatements = new HashMap<>();
+
+    /** Number of records to immediately fetch in CQL statement execution. */
+    private Integer fetchSize;
+
+    /** Consistency level for Cassandra READ operations (select). */
+    private ConsistencyLevel readConsistency;
+
+    /** Consistency level for Cassandra WRITE operations (insert/update/delete). */
+    private ConsistencyLevel writeConsistency;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Table absence error handlers counter. */
+    private final AtomicInteger tblAbsenceHandlersCnt = new AtomicInteger(-1);
+
+    /** Prepared statement cluster disconnection error handlers counter. */
+    private final AtomicInteger prepStatementHandlersCnt = new AtomicInteger(-1);
+
+    /**
+     * Creates instance of Cassandra driver session wrapper.
+     *
+     * @param builder Builder for Cassandra cluster.
+     * @param fetchSize Number of rows to immediately fetch in CQL statement execution.
+     * @param readConsistency Consistency level for Cassandra READ operations (select).
+     * @param writeConsistency Consistency level for Cassandra WRITE operations (insert/update/delete).
+     * @param log Logger.
+     */
+    public CassandraSessionImpl(Cluster.Builder builder, Integer fetchSize, ConsistencyLevel readConsistency,
+        ConsistencyLevel writeConsistency, IgniteLogger log) {
+        this.builder = builder;
+        this.fetchSize = fetchSize;
+        this.readConsistency = readConsistency;
+        this.writeConsistency = writeConsistency;
+        this.log = log;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <V> V execute(ExecutionAssistant<V> assistant) {
+        int attempt = 0;
+        Throwable error = null;
+        String errorMsg = "Failed to execute Cassandra CQL statement: " + assistant.getStatement();
+
+        RandomSleeper sleeper = newSleeper();
+
+        incrementSessionRefs();
+
+        try {
+            while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+                error = null;
+
+                if (attempt != 0) {
+                    log.warning("Trying " + (attempt + 1) + " attempt to execute Cassandra CQL statement: " +
+                            assistant.getStatement());
+                }
+
+                try {
+                    PreparedStatement preparedSt = prepareStatement(assistant.getStatement(),
+                        assistant.getPersistenceSettings(), assistant.tableExistenceRequired());
+
+                    if (preparedSt == null)
+                        return null;
+
+                    Statement statement = tuneStatementExecutionOptions(assistant.bindStatement(preparedSt));
+                    ResultSet res = session().execute(statement);
+
+                    Row row = res == null || !res.iterator().hasNext() ? null : res.iterator().next();
+
+                    return row == null ? null : assistant.process(row);
+                }
+                catch (Throwable e) {
+                    error = e;
+
+                    if (CassandraHelper.isTableAbsenceError(e)) {
+                        if (!assistant.tableExistenceRequired()) {
+                            log.warning(errorMsg, e);
+                            return null;
+                        }
+
+                        handleTableAbsenceError(assistant.getPersistenceSettings());
+                    }
+                    else if (CassandraHelper.isHostsAvailabilityError(e))
+                        handleHostsAvailabilityError(e, attempt, errorMsg);
+                    else if (CassandraHelper.isPreparedStatementClusterError(e))
+                        handlePreparedStatementClusterError(e);
+                    else
+                        // For an error which we don't know how to handle, we will not try next attempts and terminate.
+                        throw new IgniteException(errorMsg, e);
+                }
+
+                sleeper.sleep();
+
+                attempt++;
+            }
+        }
+        catch (Throwable e) {
+            error = e;
+        }
+        finally {
+            decrementSessionRefs();
+        }
+
+        log.error(errorMsg, error);
+
+        throw new IgniteException(errorMsg, error);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R, V> R execute(BatchExecutionAssistant<R, V> assistant, Iterable<? extends V> data) {
+        if (data == null || !data.iterator().hasNext())
+            return assistant.processedData();
+
+        int attempt = 0;
+        String errorMsg = "Failed to execute Cassandra " + assistant.operationName() + " operation";
+        Throwable error = new IgniteException(errorMsg);
+
+        RandomSleeper sleeper = newSleeper();
+
+        int dataSize = 0;
+
+        incrementSessionRefs();
+
+        try {
+            while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+                if (attempt != 0) {
+                    log.warning("Trying " + (attempt + 1) + " attempt to execute Cassandra batch " +
+                            assistant.operationName() + " operation to process rest " +
+                            (dataSize - assistant.processedCount()) + " of " + dataSize + " elements");
+                }
+
+                //clean errors info before next communication with Cassandra
+                Throwable unknownEx = null;
+                Throwable tblAbsenceEx = null;
+                Throwable hostsAvailEx = null;
+                Throwable prepStatEx = null;
+
+                List<Cache.Entry<Integer, ResultSetFuture>> futResults = new LinkedList<>();
+
+                PreparedStatement preparedSt = prepareStatement(assistant.getStatement(),
+                    assistant.getPersistenceSettings(), assistant.tableExistenceRequired());
+
+                if (preparedSt == null)
+                    return null;
+
+                int seqNum = 0;
+
+                for (V obj : data) {
+                    if (!assistant.alreadyProcessed(seqNum)) {
+                        try {
+                            Statement statement = tuneStatementExecutionOptions(assistant.bindStatement(preparedSt, obj));
+                            ResultSetFuture fut = session().executeAsync(statement);
+                            futResults.add(new CacheEntryImpl<>(seqNum, fut));
+                        }
+                        catch (Throwable e) {
+                            if (CassandraHelper.isTableAbsenceError(e)) {
+                                // If there are table absence error and it is not required for the operation we can return.
+                                if (!assistant.tableExistenceRequired())
+                                    return assistant.processedData();
+
+                                tblAbsenceEx = e;
+                                handleTableAbsenceError(assistant.getPersistenceSettings());
+                            }
+                            else if (CassandraHelper.isHostsAvailabilityError(e)) {
+                                hostsAvailEx = e;
+
+                                // Handle host availability only once.
+                                if (hostsAvailEx == null)
+                                    handleHostsAvailabilityError(e, attempt, errorMsg);
+                            }
+                            else if (CassandraHelper.isPreparedStatementClusterError(e)) {
+                                prepStatEx = e;
+                                handlePreparedStatementClusterError(e);
+                            }
+                            else
+                                unknownEx = e;
+                        }
+                    }
+
+                    seqNum++;
+                }
+
+                dataSize = seqNum;
+
+                // For an error which we don't know how to handle, we will not try next attempts and terminate.
+                if (unknownEx != null)
+                    throw new IgniteException(errorMsg, unknownEx);
+
+                // Remembering any of last errors.
+                if (tblAbsenceEx != null)
+                    error = tblAbsenceEx;
+                else if (hostsAvailEx != null)
+                    error = hostsAvailEx;
+                else if (prepStatEx != null)
+                    error = prepStatEx;
+
+                // Clean errors info before next communication with Cassandra.
+                unknownEx = null;
+                tblAbsenceEx = null;
+                hostsAvailEx = null;
+                prepStatEx = null;
+
+                for (Cache.Entry<Integer, ResultSetFuture> futureResult : futResults) {
+                    try {
+                        ResultSet resSet = futureResult.getValue().getUninterruptibly();
+                        Row row = resSet != null && resSet.iterator().hasNext() ? resSet.iterator().next() : null;
+
+                        if (row != null)
+                            assistant.process(row, futureResult.getKey());
+                    }
+                    catch (Throwable e) {
+                        if (CassandraHelper.isTableAbsenceError(e))
+                            tblAbsenceEx = e;
+                        else if (CassandraHelper.isHostsAvailabilityError(e))
+                            hostsAvailEx = e;
+                        else if (CassandraHelper.isPreparedStatementClusterError(e))
+                            prepStatEx = e;
+                        else
+                            unknownEx = e;
+                    }
+                }
+
+                // For an error which we don't know how to handle, we will not try next attempts and terminate.
+                if (unknownEx != null)
+                    throw new IgniteException(errorMsg, unknownEx);
+
+                // If there are no errors occurred it means that operation successfully completed and we can return.
+                if (tblAbsenceEx == null && hostsAvailEx == null && prepStatEx == null)
+                    return assistant.processedData();
+
+                if (tblAbsenceEx != null) {
+                    // If there are table absence error and it is not required for the operation we can return.
+                    if (!assistant.tableExistenceRequired())
+                        return assistant.processedData();
+
+                    error = tblAbsenceEx;
+                    handleTableAbsenceError(assistant.getPersistenceSettings());
+                }
+
+                if (hostsAvailEx != null) {
+                    error = hostsAvailEx;
+                    handleHostsAvailabilityError(hostsAvailEx, attempt, errorMsg);
+                }
+
+                if (prepStatEx != null) {
+                    error = prepStatEx;
+                    handlePreparedStatementClusterError(prepStatEx);
+                }
+
+                sleeper.sleep();
+
+                attempt++;
+            }
+        }
+        catch (Throwable e) {
+            error = e;
+        }
+        finally {
+            decrementSessionRefs();
+        }
+
+        errorMsg = "Failed to process " + (dataSize - assistant.processedCount()) +
+            " of " + dataSize + " elements, during " + assistant.operationName() +
+            " operation with Cassandra";
+
+        log.error(errorMsg, error);
+
+        throw new IgniteException(errorMsg, error);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void execute(BatchLoaderAssistant assistant) {
+        int attempt = 0;
+        String errorMsg = "Failed to execute Cassandra " + assistant.operationName() + " operation";
+        Throwable error = new IgniteException(errorMsg);
+
+        RandomSleeper sleeper = newSleeper();
+
+        incrementSessionRefs();
+
+        try {
+            while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+                if (attempt != 0)
+                    log.warning("Trying " + (attempt + 1) + " attempt to load Ignite cache");
+
+                Statement statement = tuneStatementExecutionOptions(assistant.getStatement());
+
+                try {
+                    ResultSetFuture fut = session().executeAsync(statement);
+                    ResultSet resSet = fut.getUninterruptibly();
+
+                    if (resSet == null || !resSet.iterator().hasNext())
+                        return;
+
+                    for (Row row : resSet)
+                        assistant.process(row);
+
+                    return;
+                }
+                catch (Throwable e) {
+                    error = e;
+
+                    if (CassandraHelper.isTableAbsenceError(e))
+                        return;
+                    else if (CassandraHelper.isHostsAvailabilityError(e))
+                        handleHostsAvailabilityError(e, attempt, errorMsg);
+                    else if (CassandraHelper.isPreparedStatementClusterError(e))
+                        handlePreparedStatementClusterError(e);
+                    else
+                        // For an error which we don't know how to handle, we will not try next attempts and terminate.
+                        throw new IgniteException(errorMsg, e);
+                }
+
+                sleeper.sleep();
+
+                attempt++;
+            }
+        }
+        catch (Throwable e) {
+            error = e;
+        }
+        finally {
+            decrementSessionRefs();
+        }
+
+        log.error(errorMsg, error);
+
+        throw new IgniteException(errorMsg, error);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() throws IOException {
+        if (decrementSessionRefs() == 0 && ses != null) {
+            SessionPool.put(this, ses);
+            ses = null;
+        }
+    }
+
+    /**
+     * Recreates Cassandra driver session.
+     */
+    private synchronized void refresh() {
+        //make sure that session removed from the pool
+        SessionPool.get(this);
+
+        //closing and reopening session
+        CassandraHelper.closeSession(ses);
+        ses = null;
+        session();
+
+        synchronized (sesStatements) {
+            sesStatements.clear();
+        }
+    }
+
+    /**
+     * @return Cassandra driver session.
+     */
+    private synchronized Session session() {
+        if (ses != null)
+            return ses;
+
+        ses = SessionPool.get(this);
+
+        if (ses != null)
+            return ses;
+
+        synchronized (sesStatements) {
+            sesStatements.clear();
+        }
+
+        try {
+            return ses = builder.build().connect();
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Failed to establish session with Cassandra database", e);
+        }
+    }
+
+    /**
+     * Increments number of references to Cassandra driver session (required for multithreaded environment).
+     */
+    private synchronized void incrementSessionRefs() {
+        refCnt++;
+    }
+
+    /**
+     * Decrements number of references to Cassandra driver session (required for multithreaded environment).
+     */
+    private synchronized int decrementSessionRefs() {
+        if (refCnt != 0)
+            refCnt--;
+
+        return refCnt;
+    }
+
+    /**
+     * Prepares CQL statement using current Cassandra driver session.
+     *
+     * @param statement CQL statement.
+     * @param settings Persistence settings.
+     * @param tblExistenceRequired Flag indicating if table existence is required for the statement.
+     * @return Prepared statement.
+     */
+    private PreparedStatement prepareStatement(String statement, KeyValuePersistenceSettings settings,
+        boolean tblExistenceRequired) {
+
+        int attempt = 0;
+        Throwable error = null;
+        String errorMsg = "Failed to prepare Cassandra CQL statement: " + statement;
+
+        RandomSleeper sleeper = newSleeper();
+
+        incrementSessionRefs();
+
+        try {
+            synchronized (sesStatements) {
+                if (sesStatements.containsKey(statement))
+                    return sesStatements.get(statement);
+            }
+
+            while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+                try {
+                    PreparedStatement prepStatement = session().prepare(statement);
+
+                    synchronized (sesStatements) {
+                        sesStatements.put(statement, prepStatement);
+                    }
+
+                    return prepStatement;
+                }
+                catch (Throwable e) {
+                    if (CassandraHelper.isTableAbsenceError(e)) {
+                        if (!tblExistenceRequired)
+                            return null;
+
+                        handleTableAbsenceError(settings);
+                    }
+                    else if (CassandraHelper.isHostsAvailabilityError(e))
+                        handleHostsAvailabilityError(e, attempt, errorMsg);
+                    else
+                        throw new IgniteException(errorMsg, e);
+
+                    error = e;
+                }
+
+                sleeper.sleep();
+
+                attempt++;
+            }
+        }
+        finally {
+            decrementSessionRefs();
+        }
+
+        throw new IgniteException(errorMsg, error);
+    }
+
+    /**
+     * Creates Cassandra keyspace.
+     *
+     * @param settings Persistence settings.
+     */
+    private void createKeyspace(KeyValuePersistenceSettings settings) {
+        int attempt = 0;
+        Throwable error = null;
+        String errorMsg = "Failed to create Cassandra keyspace '" + settings.getKeyspace() + "'";
+
+        while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+            try {
+                log.info("-----------------------------------------------------------------------");
+                log.info("Creating Cassandra keyspace '" + settings.getKeyspace() + "'");
+                log.info("-----------------------------------------------------------------------\n\n" +
+                    settings.getKeyspaceDDLStatement() + "\n");
+                log.info("-----------------------------------------------------------------------");
+                session().execute(settings.getKeyspaceDDLStatement());
+                log.info("Cassandra keyspace '" + settings.getKeyspace() + "' was successfully created");
+                return;
+            }
+            catch (AlreadyExistsException ignored) {
+                log.info("Cassandra keyspace '" + settings.getKeyspace() + "' already exist");
+                return;
+            }
+            catch (Throwable e) {
+                if (!CassandraHelper.isHostsAvailabilityError(e))
+                    throw new IgniteException(errorMsg, e);
+
+                handleHostsAvailabilityError(e, attempt, errorMsg);
+
+                error = e;
+            }
+
+            attempt++;
+        }
+
+        throw new IgniteException(errorMsg, error);
+    }
+
+    /**
+     * Creates Cassandra table.
+     *
+     * @param settings Persistence settings.
+     */
+    private void createTable(KeyValuePersistenceSettings settings) {
+        int attempt = 0;
+        Throwable error = null;
+        String errorMsg = "Failed to create Cassandra table '" + settings.getTableFullName() + "'";
+
+        while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+            try {
+                log.info("-----------------------------------------------------------------------");
+                log.info("Creating Cassandra table '" + settings.getTableFullName() + "'");
+                log.info("-----------------------------------------------------------------------\n\n" +
+                    settings.getTableDDLStatement() + "\n");
+                log.info("-----------------------------------------------------------------------");
+                session().execute(settings.getTableDDLStatement());
+                log.info("Cassandra table '" + settings.getTableFullName() + "' was successfully created");
+                return;
+            }
+            catch (AlreadyExistsException ignored) {
+                log.info("Cassandra table '" + settings.getTableFullName() + "' already exist");
+                return;
+            }
+            catch (Throwable e) {
+                if (!CassandraHelper.isHostsAvailabilityError(e) && !CassandraHelper.isKeyspaceAbsenceError(e))
+                    throw new IgniteException(errorMsg, e);
+
+                if (CassandraHelper.isKeyspaceAbsenceError(e)) {
+                    log.warning("Failed to create Cassandra table '" + settings.getTableFullName() +
+                        "' cause appropriate keyspace doesn't exist", e);
+                    createKeyspace(settings);
+                }
+                else if (CassandraHelper.isHostsAvailabilityError(e))
+                    handleHostsAvailabilityError(e, attempt, errorMsg);
+
+                error = e;
+            }
+
+            attempt++;
+        }
+
+        throw new IgniteException(errorMsg, error);
+    }
+
+    /**
+     * Creates Cassandra table indexes.
+     *
+     * @param settings Persistence settings.
+     */
+    private void createTableIndexes(KeyValuePersistenceSettings settings) {
+        if (settings.getIndexDDLStatements() == null || settings.getIndexDDLStatements().isEmpty())
+            return;
+
+        int attempt = 0;
+        Throwable error = null;
+        String errorMsg = "Failed to create indexes for Cassandra table " + settings.getTableFullName();
+
+        while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+            try {
+                log.info("Creating indexes for Cassandra table '" + settings.getTableFullName() + "'");
+
+                for (String statement : settings.getIndexDDLStatements()) {
+                    try {
+                        session().execute(statement);
+                    }
+                    catch (AlreadyExistsException ignored) {
+                    }
+                    catch (Throwable e) {
+                        if (!(e instanceof InvalidQueryException) || !e.getMessage().equals("Index already exists"))
+                            throw new IgniteException(errorMsg, e);
+                    }
+                }
+
+                log.info("Indexes for Cassandra table '" + settings.getTableFullName() + "' were successfully created");
+
+                return;
+            }
+            catch (Throwable e) {
+                if (CassandraHelper.isHostsAvailabilityError(e))
+                    handleHostsAvailabilityError(e, attempt, errorMsg);
+                else if (CassandraHelper.isTableAbsenceError(e))
+                    createTable(settings);
+                else
+                    throw new IgniteException(errorMsg, e);
+
+                error = e;
+            }
+
+            attempt++;
+        }
+
+        throw new IgniteException(errorMsg, error);
+    }
+
+    /**
+     * Tunes CQL statement execution options (consistency level, fetch option and etc.).
+     *
+     * @param statement Statement.
+     * @return Modified statement.
+     */
+    private Statement tuneStatementExecutionOptions(Statement statement) {
+        String qry = "";
+
+        if (statement instanceof BoundStatement)
+            qry = ((BoundStatement)statement).preparedStatement().getQueryString().trim().toLowerCase();
+        else if (statement instanceof PreparedStatement)
+            qry = ((PreparedStatement)statement).getQueryString().trim().toLowerCase();
+
+        boolean readStatement = qry.startsWith("select");
+        boolean writeStatement = statement instanceof Batch || statement instanceof BatchStatement ||
+            qry.startsWith("insert") || qry.startsWith("delete") || qry.startsWith("update");
+
+        if (readStatement && readConsistency != null)
+            statement.setConsistencyLevel(readConsistency);
+
+        if (writeStatement && writeConsistency != null)
+            statement.setConsistencyLevel(writeConsistency);
+
+        if (fetchSize != null)
+            statement.setFetchSize(fetchSize);
+
+        return statement;
+    }
+
+    /**
+     * Handles situation when Cassandra table doesn't exist.
+     *
+     * @param settings Persistence settings.
+     */
+    private void handleTableAbsenceError(KeyValuePersistenceSettings settings) {
+        int hndNum = tblAbsenceHandlersCnt.incrementAndGet();
+
+        try {
+            synchronized (tblAbsenceHandlersCnt) {
+                // Oooops... I am not the first thread who tried to handle table absence problem.
+                if (hndNum != 0) {
+                    log.warning("Table " + settings.getTableFullName() + " absence problem detected. " +
+                            "Another thread already fixed it.");
+                    return;
+                }
+
+                log.warning("Table " + settings.getTableFullName() + " absence problem detected. " +
+                        "Trying to create table.");
+
+                IgniteException error = new IgniteException("Failed to create Cassandra table " + settings.getTableFullName());
+
+                int attempt = 0;
+
+                while (error != null && attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+                    error = null;
+
+                    try {
+                        createKeyspace(settings);
+                        createTable(settings);
+                        createTableIndexes(settings);
+                    }
+                    catch (Throwable e) {
+                        if (CassandraHelper.isHostsAvailabilityError(e))
+                            handleHostsAvailabilityError(e, attempt, null);
+                        else
+                            throw new IgniteException("Failed to create Cassandra table " + settings.getTableFullName(), e);
+
+                        error = (e instanceof IgniteException) ? (IgniteException)e : new IgniteException(e);
+                    }
+
+                    attempt++;
+                }
+
+                if (error != null)
+                    throw error;
+            }
+        }
+        finally {
+            if (hndNum == 0)
+                tblAbsenceHandlersCnt.set(-1);
+        }
+    }
+
+    /**
+     * Handles situation when prepared statement execution failed cause session to the cluster was released.
+     *
+     */
+    private void handlePreparedStatementClusterError(Throwable e) {
+        int hndNum = prepStatementHandlersCnt.incrementAndGet();
+
+        try {
+            synchronized (prepStatementHandlersCnt) {
+                // Oooops... I am not the first thread who tried to handle prepared statement problem.
+                if (hndNum != 0) {
+                    log.warning("Prepared statement cluster error detected, another thread already fixed the problem", e);
+                    return;
+                }
+
+                log.warning("Prepared statement cluster error detected, refreshing Cassandra session", e);
+
+                refresh();
+
+                log.warning("Cassandra session refreshed");
+            }
+        }
+        finally {
+            if (hndNum == 0)
+                prepStatementHandlersCnt.set(-1);
+        }
+    }
+
+    /**
+     * Handles situation when Cassandra host which is responsible for CQL query execution became unavailable.
+     *
+     * @param e Exception to handle.
+     * @param attempt Number of attempts.
+     * @param msg Error message.
+     * @return {@code true} if host unavailability was successfully handled.
+     */
+    private boolean handleHostsAvailabilityError(Throwable e, int attempt, String msg) {
+        if (attempt >= CQL_EXECUTION_ATTEMPTS_COUNT) {
+            log.error("Host availability problem detected. " +
+                    "Number of CQL execution attempts reached maximum " + CQL_EXECUTION_ATTEMPTS_COUNT +
+                    ", exception will be thrown to upper execution layer.", e);
+            throw msg == null ? new IgniteException(e) : new IgniteException(msg, e);
+        }
+
+        if (attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 4  ||
+            attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2  ||
+            attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2 + CQL_EXECUTION_ATTEMPTS_COUNT / 4  ||
+            attempt == CQL_EXECUTION_ATTEMPTS_COUNT - 1) {
+            log.warning("Host availability problem detected, CQL execution attempt  " + (attempt + 1) + ", " +
+                    "refreshing Cassandra session", e);
+
+            refresh();
+
+            log.warning("Cassandra session refreshed");
+
+            return true;
+        }
+
+        log.warning("Host availability problem detected, CQL execution attempt " + (attempt + 1) + ", " +
+                "sleeping extra " + CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT + " milliseconds", e);
+
+        try {
+            Thread.sleep(CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT);
+        }
+        catch (InterruptedException ignored) {
+        }
+
+        log.warning("Sleep completed");
+
+        return false;
+    }
+
+    /**
+     * @return New random sleeper.
+     */
+    private RandomSleeper newSleeper() {
+        return new RandomSleeper(CQL_EXECUTION_ATTEMPT_MIN_TIMEOUT,
+                CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT,
+                CQL_ATTEMPTS_TIMEOUT_INCREMENT, log);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java
new file mode 100644
index 0000000..867f58d
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+
+/**
+ * Provides information for single operations (load, delete, write) of Ignite cache
+ * backed by {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}.
+ *
+ * @param <R> type of the result returned from operation.
+ */
+public interface ExecutionAssistant<R> {
+    /**
+     * Indicates if Cassandra table existence is required for operation.
+     *
+     * @return true if table existence required.
+     */
+    public boolean tableExistenceRequired();
+
+    /**
+     * Returns CQL statement to be used for operation.
+     *
+     * @return CQL statement.
+     */
+    public String getStatement();
+
+    /**
+     * Binds prepared statement.
+     *
+     * @param statement prepared statement.
+     *
+     * @return bound statement.
+     */
+    public BoundStatement bindStatement(PreparedStatement statement);
+
+    /**
+     * Persistence settings to use for operation.
+     *
+     * @return persistence settings.
+     */
+    public KeyValuePersistenceSettings getPersistenceSettings();
+
+    /**
+     * Returns operation name.
+     *
+     * @return operation name.
+     */
+    public String operationName();
+
+    /**
+     * Processes Cassandra database table row returned by specified CQL statement.
+     *
+     * @param row Cassandra database table row.
+     *
+     * @return result of the operation.
+     */
+    public R process(Row row);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java
new file mode 100644
index 0000000..17494dd
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.Row;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Implementation of the {@link org.apache.ignite.cache.store.cassandra.session.BatchExecutionAssistant}.
+ *
+ * @param <R> Type of the result returned from batch operation
+ * @param <V> Type of the value used in batch operation
+ */
+public abstract class GenericBatchExecutionAssistant<R, V> implements BatchExecutionAssistant<R, V> {
+    /** Identifiers of already processed objects. */
+    private Set<Integer> processed = new HashSet<>();
+
+    /** {@inheritDoc} */
+    @Override public void process(Row row, int seqNum) {
+        if (processed.contains(seqNum))
+            return;
+
+        process(row);
+
+        processed.add(seqNum);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean alreadyProcessed(int seqNum) {
+        return processed.contains(seqNum);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int processedCount() {
+        return processed.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public R processedData() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean tableExistenceRequired() {
+        return false;
+    }
+
+    /**
+     * Processes particular row inside batch operation.
+     *
+     * @param row Row to process.
+     */
+    protected void process(Row row) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
new file mode 100644
index 0000000..d3ace7d
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
+import org.apache.ignite.lang.IgniteBiInClosure;
+
+/**
+ * Worker for load cache using custom user query.
+ *
+ * @param <K> Key type.
+ * @param <V> Value type.
+ */
+public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void> {
+    /** Cassandra session to execute CQL query */
+    private final CassandraSession ses;
+
+    /** User query. */
+    private final String qry;
+
+    /** Persistence controller */
+    private final PersistenceController ctrl;
+
+    /** Logger */
+    private final IgniteLogger log;
+
+    /** Closure for loaded values. */
+    private final IgniteBiInClosure<K, V> clo;
+
+    /**
+     * @param clo Closure for loaded values.
+     */
+    public LoadCacheCustomQueryWorker(CassandraSession ses, String qry, PersistenceController ctrl,
+        IgniteLogger log, IgniteBiInClosure<K, V> clo) {
+        this.ses = ses;
+        this.qry = qry.trim().endsWith(";") ? qry : qry + ";";
+        this.ctrl = ctrl;
+        this.log = log;
+        this.clo = clo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Void call() throws Exception {
+        ses.execute(new BatchLoaderAssistant() {
+            /** {@inheritDoc} */
+            @Override public String operationName() {
+                return "loadCache";
+            }
+
+            /** {@inheritDoc} */
+            @Override public Statement getStatement() {
+                return new SimpleStatement(qry);
+            }
+
+            /** {@inheritDoc} */
+            @Override public void process(Row row) {
+                K key;
+                V val;
+
+                try {
+                    key = (K)ctrl.buildKeyObject(row);
+                }
+                catch (Throwable e) {
+                    log.error("Failed to build Ignite key object from provided Cassandra row", e);
+
+                    throw new IgniteException("Failed to build Ignite key object from provided Cassandra row", e);
+                }
+
+                try {
+                    val = (V)ctrl.buildValueObject(row);
+                }
+                catch (Throwable e) {
+                    log.error("Failed to build Ignite value object from provided Cassandra row", e);
+
+                    throw new IgniteException("Failed to build Ignite value object from provided Cassandra row", e);
+                }
+
+                clo.apply(key, val);
+            }
+        });
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java
new file mode 100644
index 0000000..ecbbe78
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains classes responsible for handling sessions and communication with Cassandra
+ */
+package org.apache.ignite.cache.store.cassandra.session;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
new file mode 100644
index 0000000..fc4a907
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.pool;
+
+import com.datastax.driver.core.Session;
+import java.lang.Thread.State;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
+
+/**
+ * Cassandra driver sessions pool.
+ */
+public class SessionPool {
+    /**
+     * Monitors session pool and closes unused session.
+     */
+    private static class SessionMonitor extends Thread {
+        /** {@inheritDoc} */
+        @Override public void run() {
+            try {
+                while (true) {
+                    try {
+                        Thread.sleep(SLEEP_TIMEOUT);
+                    }
+                    catch (InterruptedException ignored) {
+                        return;
+                    }
+
+                    List<Map.Entry<CassandraSessionImpl, SessionWrapper>> expiredSessions = new LinkedList<>();
+
+                    int sessionsCnt;
+
+                    synchronized (sessions) {
+                        sessionsCnt = sessions.size();
+
+                        for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : sessions.entrySet()) {
+                            if (entry.getValue().expired())
+                                expiredSessions.add(entry);
+                        }
+
+                        for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : expiredSessions)
+                            sessions.remove(entry.getKey());
+                    }
+
+                    for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : expiredSessions)
+                        entry.getValue().release();
+
+                    // all sessions in the pool expired, thus we don't need additional thread to manage sessions in the pool
+                    if (sessionsCnt == expiredSessions.size())
+                        return;
+                }
+            }
+            finally {
+                release();
+            }
+        }
+    }
+
+    /** Sessions monitor sleep timeout. */
+    private static final long SLEEP_TIMEOUT = 60000; // 1 minute.
+
+    /** Sessions which were returned to pool. */
+    private static final Map<CassandraSessionImpl, SessionWrapper> sessions = new HashMap<>();
+
+    /** Singleton instance. */
+    private static SessionMonitor monitorSingleton;
+
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override public void run() {
+                release();
+            }
+        });
+    }
+
+    /**
+     * Returns Cassandra driver session to sessions pool.
+     *
+     * @param cassandraSes Session wrapper.
+     * @param driverSes Driver session.
+     */
+    public static void put(CassandraSessionImpl cassandraSes, Session driverSes) {
+        if (cassandraSes == null || driverSes == null)
+            return;
+
+        SessionWrapper old;
+
+        synchronized (sessions) {
+            old = sessions.put(cassandraSes, new SessionWrapper(driverSes));
+
+            if (monitorSingleton == null || State.TERMINATED.equals(monitorSingleton.getState())) {
+                monitorSingleton = new SessionMonitor();
+                monitorSingleton.setDaemon(true);
+                monitorSingleton.setName("Cassandra-sessions-pool");
+                monitorSingleton.start();
+            }
+        }
+
+        if (old != null)
+            old.release();
+    }
+
+    /**
+     * Extracts Cassandra driver session from pool.
+     *
+     * @param cassandraSes Session wrapper.
+     * @return Cassandra driver session.
+     */
+    public static Session get(CassandraSessionImpl cassandraSes) {
+        if (cassandraSes == null)
+            return null;
+
+        SessionWrapper wrapper;
+
+        synchronized (sessions) {
+            wrapper = sessions.remove(cassandraSes);
+        }
+
+        return wrapper == null ? null : wrapper.driverSession();
+    }
+
+    /**
+     * Releases all session from pool and closes all their connections to Cassandra database.
+     */
+    public static void release() {
+        Collection<SessionWrapper> wrappers;
+
+        synchronized (sessions) {
+            try {
+                if (sessions.size() == 0)
+                    return;
+
+                wrappers = new LinkedList<>();
+
+                for (SessionWrapper wrapper : sessions.values())
+                    wrappers.add(wrapper);
+
+                sessions.clear();
+            }
+            finally {
+                if (!(Thread.currentThread() instanceof SessionMonitor) && monitorSingleton != null) {
+                    try {
+                        monitorSingleton.interrupt();
+                    }
+                    catch (Throwable ignored) {
+                    }
+                }
+            }
+        }
+
+        for (SessionWrapper wrapper : wrappers)
+            wrapper.release();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
new file mode 100644
index 0000000..7c5722b
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.pool;
+
+import com.datastax.driver.core.Session;
+import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
+
+/**
+ * Wrapper for Cassandra driver session, responsible for monitoring session expiration and its closing.
+ */
+public class SessionWrapper {
+    /** Expiration timeout for Cassandra driver session. */
+    public static final long DFLT_EXPIRATION_TIMEOUT = 300000;  // 5 minutes.
+
+    /** Cassandra driver session. */
+    private Session ses;
+
+    /** Wrapper creation time.  */
+    private long time;
+
+    /**
+     * Creates instance of Cassandra driver session wrapper.
+     *
+     * @param ses Cassandra driver session.
+     */
+    public SessionWrapper(Session ses) {
+        this.ses = ses;
+        this.time = System.currentTimeMillis();
+    }
+
+    /**
+     * Checks if Cassandra driver session expired.
+     *
+     * @return true if session expired.
+     */
+    public boolean expired() {
+        return System.currentTimeMillis() - time > DFLT_EXPIRATION_TIMEOUT;
+    }
+
+    /**
+     * Returns wrapped Cassandra driver session.
+     *
+     * @return Cassandra driver session.
+     */
+    public Session driverSession() {
+        return ses;
+    }
+
+    /**
+     * Closes wrapped Cassandra driver session
+     */
+    public void release() {
+        CassandraHelper.closeSession(ses);
+        ses = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java
new file mode 100644
index 0000000..21c292f
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains session pool implenetation for Cassandra sessions
+ */
+package org.apache.ignite.cache.store.cassandra.session.pool;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java
new file mode 100644
index 0000000..4f40478
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.utils;
+
+import java.io.File;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+
+/**
+ * Generates Cassandra DDL statements from persistence descriptor xml file.
+ */
+public class DDLGenerator {
+    /**
+     * DDLGenerator entry point.
+     *
+     * @param args Arguments for DDLGenerator.
+     */
+    public static void main(String[] args) {
+        if (args == null || args.length == 0)
+            return;
+
+        for (String arg : args) {
+            File file = new File(arg);
+            if (!file.isFile()) {
+                System.out.println("-------------------------------------------------------------");
+                System.out.println("Incorrect file specified: " + arg);
+                System.out.println("-------------------------------------------------------------");
+                continue;
+            }
+
+            try {
+                KeyValuePersistenceSettings settings = new KeyValuePersistenceSettings(file);
+                System.out.println("-------------------------------------------------------------");
+                System.out.println("DDL for keyspace/table from file: " + arg);
+                System.out.println("-------------------------------------------------------------");
+                System.out.println();
+                System.out.println(settings.getKeyspaceDDLStatement());
+                System.out.println();
+                System.out.println(settings.getTableDDLStatement());
+                System.out.println();
+            }
+            catch (Throwable e) {
+                System.out.println("-------------------------------------------------------------");
+                System.out.println("Incorrect file specified: " + arg);
+                System.out.println("-------------------------------------------------------------");
+                e.printStackTrace();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java
new file mode 100644
index 0000000..2460dfe
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains utility classes
+ */
+package org.apache.ignite.cache.store.cassandra.utils;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/bootstrap/aws/README.txt
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/bootstrap/aws/README.txt b/modules/cassandra/store/src/test/bootstrap/aws/README.txt
new file mode 100644
index 0000000..a61b235
--- /dev/null
+++ b/modules/cassandra/store/src/test/bootstrap/aws/README.txt
@@ -0,0 +1,13 @@
+Shell scripts to spin up Ignite, Cassandra and Load tests clusters in AWS.
+
+1) cassandra - bootstrap scripts for Cassandra cluster nodes
+2) ganglia - bootstrap scripts for Ganglia master and agents
+3) ignite - bootstrap scripts for Ignite cluster nodes
+4) tests - bootstrap scripts for Load Tests cluster nodes
+5) common.sh - definitions for common functions
+6) env.sh - definitions for common variables
+7) log-collector.sh - log collector daemon script, to collect logs and upload them to S3
+
+For more details please look at the documentation:
+
+    https://apacheignite.readme.io/docs/aws-infrastructure-deployment
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh b/modules/cassandra/store/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh
new file mode 100644
index 0000000..017b1b1
--- /dev/null
+++ b/modules/cassandra/store/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh
@@ -0,0 +1,336 @@
+#!/bin/sh
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# -----------------------------------------------------------------------------------------------
+# Bootstrap script to spin up Cassandra cluster
+# -----------------------------------------------------------------------------------------------
+
+# URL to download AWS CLI tools
+AWS_CLI_DOWNLOAD_URL=https://s3.amazonaws.com/aws-cli/awscli-bundle.zip
+
+# URL to download JDK
+JDK_DOWNLOAD_URL=http://download.oracle.com/otn-pub/java/jdk/8u77-b03/jdk-8u77-linux-x64.tar.gz
+
+# URL to download Ignite-Cassandra tests package - you should previously package and upload it to this place
+TESTS_PACKAGE_DONLOAD_URL=s3://<bucket>/<folder>/ignite-cassandra-tests-<version>.zip
+
+# Terminates script execution and upload logs to S3
+terminate()
+{
+    SUCCESS_URL=$S3_CASSANDRA_BOOTSTRAP_SUCCESS
+    FAILURE_URL=$S3_CASSANDRA_BOOTSTRAP_FAILURE
+
+    if [ -n "$SUCCESS_URL" ] && [[ "$SUCCESS_URL" != */ ]]; then
+        SUCCESS_URL=${SUCCESS_URL}/
+    fi
+
+    if [ -n "$FAILURE_URL" ] && [[ "$FAILURE_URL" != */ ]]; then
+        FAILURE_URL=${FAILURE_URL}/
+    fi
+
+    host_name=$(hostname -f | tr '[:upper:]' '[:lower:]')
+    msg=$host_name
+
+    if [ -n "$1" ]; then
+        echo "[ERROR] $1"
+        echo "[ERROR]-----------------------------------------------------"
+        echo "[ERROR] Cassandra node bootstrap failed"
+        echo "[ERROR]-----------------------------------------------------"
+        msg=$1
+
+        if [ -z "$FAILURE_URL" ]; then
+            exit 1
+        fi
+
+        reportFolder=${FAILURE_URL}${host_name}
+        reportFile=$reportFolder/__error__
+    else
+        echo "[INFO]-----------------------------------------------------"
+        echo "[INFO] Cassandra node bootstrap successfully completed"
+        echo "[INFO]-----------------------------------------------------"
+
+        if [ -z "$SUCCESS_URL" ]; then
+            exit 0
+        fi
+
+        reportFolder=${SUCCESS_URL}${host_name}
+        reportFile=$reportFolder/__success__
+    fi
+
+    echo $msg > /opt/bootstrap-result
+
+    aws s3 rm --recursive $reportFolder
+    if [ $? -ne 0 ]; then
+        echo "[ERROR] Failed to drop report folder: $reportFolder"
+    fi
+
+    aws s3 cp --sse AES256 /opt/bootstrap-result $reportFile
+    if [ $? -ne 0 ]; then
+        echo "[ERROR] Failed to report bootstrap result to: $reportFile"
+    fi
+
+    rm -f /opt/bootstrap-result
+
+    if [ -n "$1" ]; then
+        exit 1
+    fi
+
+    exit 0
+}
+
+# Downloads specified package
+downloadPackage()
+{
+    echo "[INFO] Downloading $3 package from $1 into $2"
+
+    for i in 0 9;
+    do
+        if [[ "$1" == s3* ]]; then
+            aws s3 cp $1 $2
+            code=$?
+        else
+            curl "$1" -o "$2"
+            code=$?
+        fi
+
+        if [ $code -eq 0 ]; then
+            echo "[INFO] $3 package successfully downloaded from $1 into $2"
+            return 0
+        fi
+
+        echo "[WARN] Failed to download $3 package from $i attempt, sleeping extra 5sec"
+        sleep 5s
+    done
+
+    terminate "All 10 attempts to download $3 package from $1 are failed"
+}
+
+# Downloads and setup JDK
+setupJava()
+{
+    rm -Rf /opt/java /opt/jdk.tar.gz
+
+    echo "[INFO] Downloading 'jdk'"
+    wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" "$JDK_DOWNLOAD_URL" -O /opt/jdk.tar.gz
+    if [ $? -ne 0 ]; then
+        terminate "Failed to download 'jdk'"
+    fi
+
+    echo "[INFO] Untaring 'jdk'"
+    tar -xvzf /opt/jdk.tar.gz -C /opt
+    if [ $? -ne 0 ]; then
+        terminate "Failed to untar 'jdk'"
+    fi
+
+    rm -Rf /opt/jdk.tar.gz
+
+    unzipDir=$(ls /opt | grep "jdk")
+    if [ "$unzipDir" != "java" ]; then
+        mv /opt/$unzipDir /opt/java
+    fi
+}
+
+# Downloads and setup AWS CLI
+setupAWSCLI()
+{
+    echo "[INFO] Installing 'awscli'"
+    pip install --upgrade awscli
+    if [ $? -eq 0 ]; then
+        return 0
+    fi
+
+    echo "[ERROR] Failed to install 'awscli' using pip"
+    echo "[INFO] Trying to install awscli using zip archive"
+    echo "[INFO] Downloading awscli zip"
+
+    downloadPackage "$AWS_CLI_DOWNLOAD_URL" "/opt/awscli-bundle.zip" "awscli"
+
+    echo "[INFO] Unzipping awscli zip"
+    unzip /opt/awscli-bundle.zip -d /opt
+    if [ $? -ne 0 ]; then
+        terminate "Failed to unzip awscli zip"
+    fi
+
+    rm -Rf /opt/awscli-bundle.zip
+
+    echo "[INFO] Installing awscli"
+    /opt/awscli-bundle/install -i /usr/local/aws -b /usr/local/bin/aws
+    if [ $? -ne 0 ]; then
+        terminate "Failed to install awscli"
+    fi
+
+    echo "[INFO] Successfully installed awscli from zip archive"
+}
+
+# Setup all the pre-requisites (packages, settings and etc.)
+setupPreRequisites()
+{
+    echo "[INFO] Installing 'wget' package"
+    yum -y install wget
+    if [ $? -ne 0 ]; then
+        terminate "Failed to install 'wget' package"
+    fi
+
+    echo "[INFO] Installing 'net-tools' package"
+    yum -y install net-tools
+    if [ $? -ne 0 ]; then
+        terminate "Failed to install 'net-tools' package"
+    fi
+
+    echo "[INFO] Installing 'python' package"
+    yum -y install python
+    if [ $? -ne 0 ]; then
+        terminate "Failed to install 'python' package"
+    fi
+
+    echo "[INFO] Installing 'unzip' package"
+    yum -y install unzip
+    if [ $? -ne 0 ]; then
+        terminate "Failed to install 'unzip' package"
+    fi
+
+    downloadPackage "https://bootstrap.pypa.io/get-pip.py" "/opt/get-pip.py" "get-pip.py"
+
+    echo "[INFO] Installing 'pip'"
+    python /opt/get-pip.py
+    if [ $? -ne 0 ]; then
+        terminate "Failed to install 'pip'"
+    fi
+}
+
+# Downloads and setup tests package
+setupTestsPackage()
+{
+    downloadPackage "$TESTS_PACKAGE_DONLOAD_URL" "/opt/ignite-cassandra-tests.zip" "Tests"
+
+    rm -Rf /opt/ignite-cassandra-tests
+
+    unzip /opt/ignite-cassandra-tests.zip -d /opt
+    if [ $? -ne 0 ]; then
+        terminate "Failed to unzip tests package"
+    fi
+
+    rm -f /opt/ignite-cassandra-tests.zip
+
+    unzipDir=$(ls /opt | grep "ignite-cassandra")
+    if [ "$unzipDir" != "ignite-cassandra-tests" ]; then
+        mv /opt/$unzipDir /opt/ignite-cassandra-tests
+    fi
+
+    find /opt/ignite-cassandra-tests -type f -name "*.sh" -exec chmod ug+x {} \;
+
+    . /opt/ignite-cassandra-tests/bootstrap/aws/common.sh "cassandra"
+
+    setupNTP
+
+    echo "[INFO] Starting logs collector daemon"
+
+    HOST_NAME=$(hostname -f | tr '[:upper:]' '[:lower:]')
+    /opt/ignite-cassandra-tests/bootstrap/aws/logs-collector.sh "$S3_LOGS_TRIGGER" "$S3_CASSANDRA_LOGS/$HOST_NAME" "/opt/cassandra/logs" "/opt/cassandra/cassandra-start.log" > /opt/logs-collector.log &
+
+    echo "[INFO] Logs collector daemon started: $!"
+
+    echo "----------------------------------------------------------------------------------------"
+    printInstanceInfo
+    echo "----------------------------------------------------------------------------------------"
+    tagInstance
+    bootstrapGangliaAgent "cassandra" 8641
+}
+
+# Downloads Cassandra package
+downloadCassandra()
+{
+    downloadPackage "$CASSANDRA_DOWNLOAD_URL" "/opt/apache-cassandra.tar.gz" "Cassandra"
+
+    rm -Rf /opt/cassandra
+
+    echo "[INFO] Untaring Cassandra package"
+    tar -xvzf /opt/apache-cassandra.tar.gz -C /opt
+    if [ $? -ne 0 ]; then
+        terminate "Failed to untar Cassandra package"
+    fi
+
+    rm -f /opt/apache-cassandra.tar.gz
+
+    unzipDir=$(ls /opt | grep "cassandra" | grep "apache")
+    if [ "$unzipDir" != "cassandra" ]; then
+        mv /opt/$unzipDir /opt/cassandra
+    fi
+}
+
+# Setups Cassandra
+setupCassandra()
+{
+    echo "[INFO] Creating 'cassandra' group"
+    exists=$(cat /etc/group | grep cassandra)
+    if [ -z "$exists" ]; then
+        groupadd cassandra
+        if [ $? -ne 0 ]; then
+            terminate "Failed to create 'cassandra' group"
+        fi
+    fi
+
+    echo "[INFO] Creating 'cassandra' user"
+    exists=$(cat /etc/passwd | grep cassandra)
+    if [ -z "$exists" ]; then
+        useradd -g cassandra cassandra
+        if [ $? -ne 0 ]; then
+            terminate "Failed to create 'cassandra' user"
+        fi
+    fi
+
+    rm -f /opt/cassandra/conf/cassandra-env.sh /opt/cassandra/conf/cassandra-template.yaml
+
+    cp /opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-env.sh /opt/cassandra/conf
+    cp /opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-template.yaml /opt/cassandra/conf
+
+    chown -R cassandra:cassandra /opt/cassandra /opt/ignite-cassandra-tests
+
+    createCassandraStorageLayout
+
+    cat /opt/cassandra/conf/cassandra-template.yaml | sed -r "s/\\\$\{CASSANDRA_DATA_DIR\}/$CASSANDRA_DATA_DIR/g" > /opt/cassandra/conf/cassandra-template-1.yaml
+    cat /opt/cassandra/conf/cassandra-template-1.yaml | sed -r "s/\\\$\{CASSANDRA_COMMITLOG_DIR\}/$CASSANDRA_COMMITLOG_DIR/g" > /opt/cassandra/conf/cassandra-template-2.yaml
+    cat /opt/cassandra/conf/cassandra-template-2.yaml | sed -r "s/\\\$\{CASSANDRA_CACHES_DIR\}/$CASSANDRA_CACHES_DIR/g" > /opt/cassandra/conf/cassandra-template-3.yaml
+
+    rm -f /opt/cassandra/conf/cassandra-template.yaml /opt/cassandra/conf/cassandra-template-1.yaml /opt/cassandra/conf/cassandra-template-2.yaml
+    mv /opt/cassandra/conf/cassandra-template-3.yaml /opt/cassandra/conf/cassandra-template.yaml
+
+    echo "export JAVA_HOME=/opt/java" >> $1
+    echo "export CASSANDRA_HOME=/opt/cassandra" >> $1
+    echo "export PATH=\$JAVA_HOME/bin:\$CASSANDRA_HOME/bin:\$PATH" >> $1
+}
+
+###################################################################################################################
+
+echo "[INFO]-----------------------------------------------------------------"
+echo "[INFO] Bootstrapping Cassandra node"
+echo "[INFO]-----------------------------------------------------------------"
+
+setupPreRequisites
+setupJava
+setupAWSCLI
+setupTestsPackage
+downloadCassandra
+setupCassandra "/root/.bash_profile"
+
+cmd="/opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-start.sh"
+
+#sudo -u cassandra -g cassandra sh -c "$cmd | tee /opt/cassandra/cassandra-start.log"
+
+$cmd | tee /opt/cassandra/cassandra-start.log
\ No newline at end of file


Mime
View raw message