ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptupit...@apache.org
Subject [26/35] ignite git commit: IGNITE-3172 Refactoring Ignite-Cassandra serializers. - Fixes #956.
Date Wed, 14 Sep 2016 10:53:31 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
deleted file mode 100644
index 95b8581..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
+++ /dev/null
@@ -1,832 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.session;
-
-import com.datastax.driver.core.BatchStatement;
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.Statement;
-import com.datastax.driver.core.exceptions.AlreadyExistsException;
-import com.datastax.driver.core.exceptions.InvalidQueryException;
-import com.datastax.driver.core.querybuilder.Batch;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.cache.Cache;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
-import org.apache.ignite.cache.store.cassandra.common.RandomSleeper;
-import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
-import org.apache.ignite.cache.store.cassandra.session.pool.SessionPool;
-import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
-
-/**
- * Implementation for {@link org.apache.ignite.cache.store.cassandra.session.CassandraSession}.
- */
-public class CassandraSessionImpl implements CassandraSession {
-    /** Number of CQL query execution attempts. */
-    private static final int CQL_EXECUTION_ATTEMPTS_COUNT = 20;
-
-    /** Min timeout between CQL query execution attempts. */
-    private static final int CQL_EXECUTION_ATTEMPT_MIN_TIMEOUT = 100;
-
-    /** Max timeout between CQL query execution attempts. */
-    private static final int CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT = 500;
-
-    /** Timeout increment for CQL query execution attempts. */
-    private static final int CQL_ATTEMPTS_TIMEOUT_INCREMENT = 100;
-
-    /** Cassandra cluster builder. */
-    private volatile Cluster.Builder builder;
-
-    /** Cassandra driver session. */
-    private volatile Session ses;
-
-    /** Number of references to Cassandra driver session (for multithreaded environment). */
-    private volatile int refCnt = 0;
-
-    /** Storage for the session prepared statements */
-    private static final Map<String, PreparedStatement> sesStatements = new HashMap<>();
-
-    /** Number of records to immediately fetch in CQL statement execution. */
-    private Integer fetchSize;
-
-    /** Consistency level for Cassandra READ operations (select). */
-    private ConsistencyLevel readConsistency;
-
-    /** Consistency level for Cassandra WRITE operations (insert/update/delete). */
-    private ConsistencyLevel writeConsistency;
-
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Table absence error handlers counter. */
-    private final AtomicInteger tblAbsenceHandlersCnt = new AtomicInteger(-1);
-
-    /** Prepared statement cluster disconnection error handlers counter. */
-    private final AtomicInteger prepStatementHandlersCnt = new AtomicInteger(-1);
-
-    /**
-     * Creates instance of Cassandra driver session wrapper.
-     *
-     * @param builder Builder for Cassandra cluster.
-     * @param fetchSize Number of rows to immediately fetch in CQL statement execution.
-     * @param readConsistency Consistency level for Cassandra READ operations (select).
-     * @param writeConsistency Consistency level for Cassandra WRITE operations (insert/update/delete).
-     * @param log Logger.
-     */
-    public CassandraSessionImpl(Cluster.Builder builder, Integer fetchSize, ConsistencyLevel readConsistency,
-        ConsistencyLevel writeConsistency, IgniteLogger log) {
-        this.builder = builder;
-        this.fetchSize = fetchSize;
-        this.readConsistency = readConsistency;
-        this.writeConsistency = writeConsistency;
-        this.log = log;
-    }
-
-    /** {@inheritDoc} */
-    @Override public <V> V execute(ExecutionAssistant<V> assistant) {
-        int attempt = 0;
-        Throwable error = null;
-        String errorMsg = "Failed to execute Cassandra CQL statement: " + assistant.getStatement();
-
-        RandomSleeper sleeper = newSleeper();
-
-        incrementSessionRefs();
-
-        try {
-            while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
-                error = null;
-
-                if (attempt != 0) {
-                    log.warning("Trying " + (attempt + 1) + " attempt to execute Cassandra CQL statement: " +
-                            assistant.getStatement());
-                }
-
-                try {
-                    PreparedStatement preparedSt = prepareStatement(assistant.getStatement(),
-                        assistant.getPersistenceSettings(), assistant.tableExistenceRequired());
-
-                    if (preparedSt == null)
-                        return null;
-
-                    Statement statement = tuneStatementExecutionOptions(assistant.bindStatement(preparedSt));
-                    ResultSet res = session().execute(statement);
-
-                    Row row = res == null || !res.iterator().hasNext() ? null : res.iterator().next();
-
-                    return row == null ? null : assistant.process(row);
-                }
-                catch (Throwable e) {
-                    error = e;
-
-                    if (CassandraHelper.isTableAbsenceError(e)) {
-                        if (!assistant.tableExistenceRequired()) {
-                            log.warning(errorMsg, e);
-                            return null;
-                        }
-
-                        handleTableAbsenceError(assistant.getPersistenceSettings());
-                    }
-                    else if (CassandraHelper.isHostsAvailabilityError(e))
-                        handleHostsAvailabilityError(e, attempt, errorMsg);
-                    else if (CassandraHelper.isPreparedStatementClusterError(e))
-                        handlePreparedStatementClusterError(e);
-                    else
-                        // For an error which we don't know how to handle, we will not try next attempts and terminate.
-                        throw new IgniteException(errorMsg, e);
-                }
-
-                sleeper.sleep();
-
-                attempt++;
-            }
-        }
-        catch (Throwable e) {
-            error = e;
-        }
-        finally {
-            decrementSessionRefs();
-        }
-
-        log.error(errorMsg, error);
-
-        throw new IgniteException(errorMsg, error);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <R, V> R execute(BatchExecutionAssistant<R, V> assistant, Iterable<? extends V> data) {
-        if (data == null || !data.iterator().hasNext())
-            return assistant.processedData();
-
-        int attempt = 0;
-        String errorMsg = "Failed to execute Cassandra " + assistant.operationName() + " operation";
-        Throwable error = new IgniteException(errorMsg);
-
-        RandomSleeper sleeper = newSleeper();
-
-        int dataSize = 0;
-
-        incrementSessionRefs();
-
-        try {
-            while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
-                if (attempt != 0) {
-                    log.warning("Trying " + (attempt + 1) + " attempt to execute Cassandra batch " +
-                            assistant.operationName() + " operation to process rest " +
-                            (dataSize - assistant.processedCount()) + " of " + dataSize + " elements");
-                }
-
-                //clean errors info before next communication with Cassandra
-                Throwable unknownEx = null;
-                Throwable tblAbsenceEx = null;
-                Throwable hostsAvailEx = null;
-                Throwable prepStatEx = null;
-
-                List<Cache.Entry<Integer, ResultSetFuture>> futResults = new LinkedList<>();
-
-                PreparedStatement preparedSt = prepareStatement(assistant.getStatement(),
-                    assistant.getPersistenceSettings(), assistant.tableExistenceRequired());
-
-                if (preparedSt == null)
-                    return null;
-
-                int seqNum = 0;
-
-                for (V obj : data) {
-                    if (!assistant.alreadyProcessed(seqNum)) {
-                        try {
-                            Statement statement = tuneStatementExecutionOptions(assistant.bindStatement(preparedSt, obj));
-                            ResultSetFuture fut = session().executeAsync(statement);
-                            futResults.add(new CacheEntryImpl<>(seqNum, fut));
-                        }
-                        catch (Throwable e) {
-                            if (CassandraHelper.isTableAbsenceError(e)) {
-                                // If there are table absence error and it is not required for the operation we can return.
-                                if (!assistant.tableExistenceRequired())
-                                    return assistant.processedData();
-
-                                tblAbsenceEx = e;
-                                handleTableAbsenceError(assistant.getPersistenceSettings());
-                            }
-                            else if (CassandraHelper.isHostsAvailabilityError(e)) {
-                                hostsAvailEx = e;
-
-                                // Handle host availability only once.
-                                if (hostsAvailEx == null)
-                                    handleHostsAvailabilityError(e, attempt, errorMsg);
-                            }
-                            else if (CassandraHelper.isPreparedStatementClusterError(e)) {
-                                prepStatEx = e;
-                                handlePreparedStatementClusterError(e);
-                            }
-                            else
-                                unknownEx = e;
-                        }
-                    }
-
-                    seqNum++;
-                }
-
-                dataSize = seqNum;
-
-                // For an error which we don't know how to handle, we will not try next attempts and terminate.
-                if (unknownEx != null)
-                    throw new IgniteException(errorMsg, unknownEx);
-
-                // Remembering any of last errors.
-                if (tblAbsenceEx != null)
-                    error = tblAbsenceEx;
-                else if (hostsAvailEx != null)
-                    error = hostsAvailEx;
-                else if (prepStatEx != null)
-                    error = prepStatEx;
-
-                // Clean errors info before next communication with Cassandra.
-                unknownEx = null;
-                tblAbsenceEx = null;
-                hostsAvailEx = null;
-                prepStatEx = null;
-
-                for (Cache.Entry<Integer, ResultSetFuture> futureResult : futResults) {
-                    try {
-                        ResultSet resSet = futureResult.getValue().getUninterruptibly();
-                        Row row = resSet != null && resSet.iterator().hasNext() ? resSet.iterator().next() : null;
-
-                        if (row != null)
-                            assistant.process(row, futureResult.getKey());
-                    }
-                    catch (Throwable e) {
-                        if (CassandraHelper.isTableAbsenceError(e))
-                            tblAbsenceEx = e;
-                        else if (CassandraHelper.isHostsAvailabilityError(e))
-                            hostsAvailEx = e;
-                        else if (CassandraHelper.isPreparedStatementClusterError(e))
-                            prepStatEx = e;
-                        else
-                            unknownEx = e;
-                    }
-                }
-
-                // For an error which we don't know how to handle, we will not try next attempts and terminate.
-                if (unknownEx != null)
-                    throw new IgniteException(errorMsg, unknownEx);
-
-                // If there are no errors occurred it means that operation successfully completed and we can return.
-                if (tblAbsenceEx == null && hostsAvailEx == null && prepStatEx == null)
-                    return assistant.processedData();
-
-                if (tblAbsenceEx != null) {
-                    // If there are table absence error and it is not required for the operation we can return.
-                    if (!assistant.tableExistenceRequired())
-                        return assistant.processedData();
-
-                    error = tblAbsenceEx;
-                    handleTableAbsenceError(assistant.getPersistenceSettings());
-                }
-
-                if (hostsAvailEx != null) {
-                    error = hostsAvailEx;
-                    handleHostsAvailabilityError(hostsAvailEx, attempt, errorMsg);
-                }
-
-                if (prepStatEx != null) {
-                    error = prepStatEx;
-                    handlePreparedStatementClusterError(prepStatEx);
-                }
-
-                sleeper.sleep();
-
-                attempt++;
-            }
-        }
-        catch (Throwable e) {
-            error = e;
-        }
-        finally {
-            decrementSessionRefs();
-        }
-
-        errorMsg = "Failed to process " + (dataSize - assistant.processedCount()) +
-            " of " + dataSize + " elements, during " + assistant.operationName() +
-            " operation with Cassandra";
-
-        log.error(errorMsg, error);
-
-        throw new IgniteException(errorMsg, error);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void execute(BatchLoaderAssistant assistant) {
-        int attempt = 0;
-        String errorMsg = "Failed to execute Cassandra " + assistant.operationName() + " operation";
-        Throwable error = new IgniteException(errorMsg);
-
-        RandomSleeper sleeper = newSleeper();
-
-        incrementSessionRefs();
-
-        try {
-            while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
-                if (attempt != 0)
-                    log.warning("Trying " + (attempt + 1) + " attempt to load Ignite cache");
-
-                Statement statement = tuneStatementExecutionOptions(assistant.getStatement());
-
-                try {
-                    ResultSetFuture fut = session().executeAsync(statement);
-                    ResultSet resSet = fut.getUninterruptibly();
-
-                    if (resSet == null || !resSet.iterator().hasNext())
-                        return;
-
-                    for (Row row : resSet)
-                        assistant.process(row);
-
-                    return;
-                }
-                catch (Throwable e) {
-                    error = e;
-
-                    if (CassandraHelper.isTableAbsenceError(e))
-                        return;
-                    else if (CassandraHelper.isHostsAvailabilityError(e))
-                        handleHostsAvailabilityError(e, attempt, errorMsg);
-                    else if (CassandraHelper.isPreparedStatementClusterError(e))
-                        handlePreparedStatementClusterError(e);
-                    else
-                        // For an error which we don't know how to handle, we will not try next attempts and terminate.
-                        throw new IgniteException(errorMsg, e);
-                }
-
-                sleeper.sleep();
-
-                attempt++;
-            }
-        }
-        catch (Throwable e) {
-            error = e;
-        }
-        finally {
-            decrementSessionRefs();
-        }
-
-        log.error(errorMsg, error);
-
-        throw new IgniteException(errorMsg, error);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void close() throws IOException {
-        if (decrementSessionRefs() == 0 && ses != null) {
-            SessionPool.put(this, ses);
-            ses = null;
-        }
-    }
-
-    /**
-     * Recreates Cassandra driver session.
-     */
-    private synchronized void refresh() {
-        //make sure that session removed from the pool
-        SessionPool.get(this);
-
-        //closing and reopening session
-        CassandraHelper.closeSession(ses);
-        ses = null;
-        session();
-
-        synchronized (sesStatements) {
-            sesStatements.clear();
-        }
-    }
-
-    /**
-     * @return Cassandra driver session.
-     */
-    private synchronized Session session() {
-        if (ses != null)
-            return ses;
-
-        ses = SessionPool.get(this);
-
-        if (ses != null)
-            return ses;
-
-        synchronized (sesStatements) {
-            sesStatements.clear();
-        }
-
-        try {
-            return ses = builder.build().connect();
-        }
-        catch (Throwable e) {
-            throw new IgniteException("Failed to establish session with Cassandra database", e);
-        }
-    }
-
-    /**
-     * Increments number of references to Cassandra driver session (required for multithreaded environment).
-     */
-    private synchronized void incrementSessionRefs() {
-        refCnt++;
-    }
-
-    /**
-     * Decrements number of references to Cassandra driver session (required for multithreaded environment).
-     */
-    private synchronized int decrementSessionRefs() {
-        if (refCnt != 0)
-            refCnt--;
-
-        return refCnt;
-    }
-
-    /**
-     * Prepares CQL statement using current Cassandra driver session.
-     *
-     * @param statement CQL statement.
-     * @param settings Persistence settings.
-     * @param tblExistenceRequired Flag indicating if table existence is required for the statement.
-     * @return Prepared statement.
-     */
-    private PreparedStatement prepareStatement(String statement, KeyValuePersistenceSettings settings,
-        boolean tblExistenceRequired) {
-
-        int attempt = 0;
-        Throwable error = null;
-        String errorMsg = "Failed to prepare Cassandra CQL statement: " + statement;
-
-        RandomSleeper sleeper = newSleeper();
-
-        incrementSessionRefs();
-
-        try {
-            synchronized (sesStatements) {
-                if (sesStatements.containsKey(statement))
-                    return sesStatements.get(statement);
-            }
-
-            while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
-                try {
-                    PreparedStatement prepStatement = session().prepare(statement);
-
-                    synchronized (sesStatements) {
-                        sesStatements.put(statement, prepStatement);
-                    }
-
-                    return prepStatement;
-                }
-                catch (Throwable e) {
-                    if (CassandraHelper.isTableAbsenceError(e)) {
-                        if (!tblExistenceRequired)
-                            return null;
-
-                        handleTableAbsenceError(settings);
-                    }
-                    else if (CassandraHelper.isHostsAvailabilityError(e))
-                        handleHostsAvailabilityError(e, attempt, errorMsg);
-                    else
-                        throw new IgniteException(errorMsg, e);
-
-                    error = e;
-                }
-
-                sleeper.sleep();
-
-                attempt++;
-            }
-        }
-        finally {
-            decrementSessionRefs();
-        }
-
-        throw new IgniteException(errorMsg, error);
-    }
-
-    /**
-     * Creates Cassandra keyspace.
-     *
-     * @param settings Persistence settings.
-     */
-    private void createKeyspace(KeyValuePersistenceSettings settings) {
-        int attempt = 0;
-        Throwable error = null;
-        String errorMsg = "Failed to create Cassandra keyspace '" + settings.getKeyspace() + "'";
-
-        while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
-            try {
-                log.info("-----------------------------------------------------------------------");
-                log.info("Creating Cassandra keyspace '" + settings.getKeyspace() + "'");
-                log.info("-----------------------------------------------------------------------\n\n" +
-                    settings.getKeyspaceDDLStatement() + "\n");
-                log.info("-----------------------------------------------------------------------");
-                session().execute(settings.getKeyspaceDDLStatement());
-                log.info("Cassandra keyspace '" + settings.getKeyspace() + "' was successfully created");
-                return;
-            }
-            catch (AlreadyExistsException ignored) {
-                log.info("Cassandra keyspace '" + settings.getKeyspace() + "' already exist");
-                return;
-            }
-            catch (Throwable e) {
-                if (!CassandraHelper.isHostsAvailabilityError(e))
-                    throw new IgniteException(errorMsg, e);
-
-                handleHostsAvailabilityError(e, attempt, errorMsg);
-
-                error = e;
-            }
-
-            attempt++;
-        }
-
-        throw new IgniteException(errorMsg, error);
-    }
-
-    /**
-     * Creates Cassandra table.
-     *
-     * @param settings Persistence settings.
-     */
-    private void createTable(KeyValuePersistenceSettings settings) {
-        int attempt = 0;
-        Throwable error = null;
-        String errorMsg = "Failed to create Cassandra table '" + settings.getTableFullName() + "'";
-
-        while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
-            try {
-                log.info("-----------------------------------------------------------------------");
-                log.info("Creating Cassandra table '" + settings.getTableFullName() + "'");
-                log.info("-----------------------------------------------------------------------\n\n" +
-                    settings.getTableDDLStatement() + "\n");
-                log.info("-----------------------------------------------------------------------");
-                session().execute(settings.getTableDDLStatement());
-                log.info("Cassandra table '" + settings.getTableFullName() + "' was successfully created");
-                return;
-            }
-            catch (AlreadyExistsException ignored) {
-                log.info("Cassandra table '" + settings.getTableFullName() + "' already exist");
-                return;
-            }
-            catch (Throwable e) {
-                if (!CassandraHelper.isHostsAvailabilityError(e) && !CassandraHelper.isKeyspaceAbsenceError(e))
-                    throw new IgniteException(errorMsg, e);
-
-                if (CassandraHelper.isKeyspaceAbsenceError(e)) {
-                    log.warning("Failed to create Cassandra table '" + settings.getTableFullName() +
-                        "' cause appropriate keyspace doesn't exist", e);
-                    createKeyspace(settings);
-                }
-                else if (CassandraHelper.isHostsAvailabilityError(e))
-                    handleHostsAvailabilityError(e, attempt, errorMsg);
-
-                error = e;
-            }
-
-            attempt++;
-        }
-
-        throw new IgniteException(errorMsg, error);
-    }
-
-    /**
-     * Creates Cassandra table indexes.
-     *
-     * @param settings Persistence settings.
-     */
-    private void createTableIndexes(KeyValuePersistenceSettings settings) {
-        if (settings.getIndexDDLStatements() == null || settings.getIndexDDLStatements().isEmpty())
-            return;
-
-        int attempt = 0;
-        Throwable error = null;
-        String errorMsg = "Failed to create indexes for Cassandra table " + settings.getTableFullName();
-
-        while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
-            try {
-                log.info("Creating indexes for Cassandra table '" + settings.getTableFullName() + "'");
-
-                for (String statement : settings.getIndexDDLStatements()) {
-                    try {
-                        session().execute(statement);
-                    }
-                    catch (AlreadyExistsException ignored) {
-                    }
-                    catch (Throwable e) {
-                        if (!(e instanceof InvalidQueryException) || !e.getMessage().equals("Index already exists"))
-                            throw new IgniteException(errorMsg, e);
-                    }
-                }
-
-                log.info("Indexes for Cassandra table '" + settings.getTableFullName() + "' were successfully created");
-
-                return;
-            }
-            catch (Throwable e) {
-                if (CassandraHelper.isHostsAvailabilityError(e))
-                    handleHostsAvailabilityError(e, attempt, errorMsg);
-                else if (CassandraHelper.isTableAbsenceError(e))
-                    createTable(settings);
-                else
-                    throw new IgniteException(errorMsg, e);
-
-                error = e;
-            }
-
-            attempt++;
-        }
-
-        throw new IgniteException(errorMsg, error);
-    }
-
-    /**
-     * Tunes CQL statement execution options (consistency level, fetch option and etc.).
-     *
-     * @param statement Statement.
-     * @return Modified statement.
-     */
-    private Statement tuneStatementExecutionOptions(Statement statement) {
-        String qry = "";
-
-        if (statement instanceof BoundStatement)
-            qry = ((BoundStatement)statement).preparedStatement().getQueryString().trim().toLowerCase();
-        else if (statement instanceof PreparedStatement)
-            qry = ((PreparedStatement)statement).getQueryString().trim().toLowerCase();
-
-        boolean readStatement = qry.startsWith("select");
-        boolean writeStatement = statement instanceof Batch || statement instanceof BatchStatement ||
-            qry.startsWith("insert") || qry.startsWith("delete") || qry.startsWith("update");
-
-        if (readStatement && readConsistency != null)
-            statement.setConsistencyLevel(readConsistency);
-
-        if (writeStatement && writeConsistency != null)
-            statement.setConsistencyLevel(writeConsistency);
-
-        if (fetchSize != null)
-            statement.setFetchSize(fetchSize);
-
-        return statement;
-    }
-
-    /**
-     * Handles situation when Cassandra table doesn't exist.
-     *
-     * @param settings Persistence settings.
-     */
-    private void handleTableAbsenceError(KeyValuePersistenceSettings settings) {
-        int hndNum = tblAbsenceHandlersCnt.incrementAndGet();
-
-        try {
-            synchronized (tblAbsenceHandlersCnt) {
-                // Oooops... I am not the first thread who tried to handle table absence problem.
-                if (hndNum != 0) {
-                    log.warning("Table " + settings.getTableFullName() + " absence problem detected. " +
-                            "Another thread already fixed it.");
-                    return;
-                }
-
-                log.warning("Table " + settings.getTableFullName() + " absence problem detected. " +
-                        "Trying to create table.");
-
-                IgniteException error = new IgniteException("Failed to create Cassandra table " + settings.getTableFullName());
-
-                int attempt = 0;
-
-                while (error != null && attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
-                    error = null;
-
-                    try {
-                        createKeyspace(settings);
-                        createTable(settings);
-                        createTableIndexes(settings);
-                    }
-                    catch (Throwable e) {
-                        if (CassandraHelper.isHostsAvailabilityError(e))
-                            handleHostsAvailabilityError(e, attempt, null);
-                        else
-                            throw new IgniteException("Failed to create Cassandra table " + settings.getTableFullName(), e);
-
-                        error = (e instanceof IgniteException) ? (IgniteException)e : new IgniteException(e);
-                    }
-
-                    attempt++;
-                }
-
-                if (error != null)
-                    throw error;
-            }
-        }
-        finally {
-            if (hndNum == 0)
-                tblAbsenceHandlersCnt.set(-1);
-        }
-    }
-
-    /**
-     * Handles situation when prepared statement execution failed cause session to the cluster was released.
-     *
-     */
-    private void handlePreparedStatementClusterError(Throwable e) {
-        int hndNum = prepStatementHandlersCnt.incrementAndGet();
-
-        try {
-            synchronized (prepStatementHandlersCnt) {
-                // Oooops... I am not the first thread who tried to handle prepared statement problem.
-                if (hndNum != 0) {
-                    log.warning("Prepared statement cluster error detected, another thread already fixed the problem", e);
-                    return;
-                }
-
-                log.warning("Prepared statement cluster error detected, refreshing Cassandra session", e);
-
-                refresh();
-
-                log.warning("Cassandra session refreshed");
-            }
-        }
-        finally {
-            if (hndNum == 0)
-                prepStatementHandlersCnt.set(-1);
-        }
-    }
-
-    /**
-     * Handles situation when Cassandra host which is responsible for CQL query execution became unavailable.
-     *
-     * @param e Exception to handle.
-     * @param attempt Number of attempts.
-     * @param msg Error message.
-     * @return {@code true} if host unavailability was successfully handled.
-     */
-    private boolean handleHostsAvailabilityError(Throwable e, int attempt, String msg) {
-        if (attempt >= CQL_EXECUTION_ATTEMPTS_COUNT) {
-            log.error("Host availability problem detected. " +
-                    "Number of CQL execution attempts reached maximum " + CQL_EXECUTION_ATTEMPTS_COUNT +
-                    ", exception will be thrown to upper execution layer.", e);
-            throw msg == null ? new IgniteException(e) : new IgniteException(msg, e);
-        }
-
-        if (attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 4  ||
-            attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2  ||
-            attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2 + CQL_EXECUTION_ATTEMPTS_COUNT / 4  ||
-            attempt == CQL_EXECUTION_ATTEMPTS_COUNT - 1) {
-            log.warning("Host availability problem detected, CQL execution attempt  " + (attempt + 1) + ", " +
-                    "refreshing Cassandra session", e);
-
-            refresh();
-
-            log.warning("Cassandra session refreshed");
-
-            return true;
-        }
-
-        log.warning("Host availability problem detected, CQL execution attempt " + (attempt + 1) + ", " +
-                "sleeping extra " + CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT + " milliseconds", e);
-
-        try {
-            Thread.sleep(CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT);
-        }
-        catch (InterruptedException ignored) {
-        }
-
-        log.warning("Sleep completed");
-
-        return false;
-    }
-
-    /**
-     * @return New random sleeper.
-     */
-    private RandomSleeper newSleeper() {
-        return new RandomSleeper(CQL_EXECUTION_ATTEMPT_MIN_TIMEOUT,
-                CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT,
-                CQL_ATTEMPTS_TIMEOUT_INCREMENT, log);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java
deleted file mode 100644
index 867f58d..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.session;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.Row;
-import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
-
-/**
- * Provides information for single operations (load, delete, write) of Ignite cache
- * backed by {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}.
- *
- * @param <R> type of the result returned from operation.
- */
-public interface ExecutionAssistant<R> {
-    /**
-     * Indicates if Cassandra table existence is required for operation.
-     *
-     * @return true if table existence required.
-     */
-    public boolean tableExistenceRequired();
-
-    /**
-     * Returns CQL statement to be used for operation.
-     *
-     * @return CQL statement.
-     */
-    public String getStatement();
-
-    /**
-     * Binds prepared statement.
-     *
-     * @param statement prepared statement.
-     *
-     * @return bound statement.
-     */
-    public BoundStatement bindStatement(PreparedStatement statement);
-
-    /**
-     * Persistence settings to use for operation.
-     *
-     * @return persistence settings.
-     */
-    public KeyValuePersistenceSettings getPersistenceSettings();
-
-    /**
-     * Returns operation name.
-     *
-     * @return operation name.
-     */
-    public String operationName();
-
-    /**
-     * Processes Cassandra database table row returned by specified CQL statement.
-     *
-     * @param row Cassandra database table row.
-     *
-     * @return result of the operation.
-     */
-    public R process(Row row);
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java
deleted file mode 100644
index 17494dd..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.session;
-
-import com.datastax.driver.core.Row;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Implementation of the {@link org.apache.ignite.cache.store.cassandra.session.BatchExecutionAssistant}.
- *
- * @param <R> Type of the result returned from batch operation
- * @param <V> Type of the value used in batch operation
- */
-public abstract class GenericBatchExecutionAssistant<R, V> implements BatchExecutionAssistant<R, V> {
-    /** Identifiers of already processed objects. */
-    private Set<Integer> processed = new HashSet<>();
-
-    /** {@inheritDoc} */
-    @Override public void process(Row row, int seqNum) {
-        if (processed.contains(seqNum))
-            return;
-
-        process(row);
-
-        processed.add(seqNum);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean alreadyProcessed(int seqNum) {
-        return processed.contains(seqNum);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int processedCount() {
-        return processed.size();
-    }
-
-    /** {@inheritDoc} */
-    @Override public R processedData() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean tableExistenceRequired() {
-        return false;
-    }
-
-    /**
-     * Processes particular row inside batch operation.
-     *
-     * @param row Row to process.
-     */
-    protected void process(Row row) {
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
deleted file mode 100644
index d3ace7d..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.session;
-
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.core.Statement;
-import java.util.concurrent.Callable;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
-import org.apache.ignite.lang.IgniteBiInClosure;
-
-/**
- * Worker for load cache using custom user query.
- *
- * @param <K> Key type.
- * @param <V> Value type.
- */
-public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void> {
-    /** Cassandra session to execute CQL query */
-    private final CassandraSession ses;
-
-    /** User query. */
-    private final String qry;
-
-    /** Persistence controller */
-    private final PersistenceController ctrl;
-
-    /** Logger */
-    private final IgniteLogger log;
-
-    /** Closure for loaded values. */
-    private final IgniteBiInClosure<K, V> clo;
-
-    /**
-     * @param clo Closure for loaded values.
-     */
-    public LoadCacheCustomQueryWorker(CassandraSession ses, String qry, PersistenceController ctrl,
-        IgniteLogger log, IgniteBiInClosure<K, V> clo) {
-        this.ses = ses;
-        this.qry = qry.trim().endsWith(";") ? qry : qry + ";";
-        this.ctrl = ctrl;
-        this.log = log;
-        this.clo = clo;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Void call() throws Exception {
-        ses.execute(new BatchLoaderAssistant() {
-            /** {@inheritDoc} */
-            @Override public String operationName() {
-                return "loadCache";
-            }
-
-            /** {@inheritDoc} */
-            @Override public Statement getStatement() {
-                return new SimpleStatement(qry);
-            }
-
-            /** {@inheritDoc} */
-            @Override public void process(Row row) {
-                K key;
-                V val;
-
-                try {
-                    key = (K)ctrl.buildKeyObject(row);
-                }
-                catch (Throwable e) {
-                    log.error("Failed to build Ignite key object from provided Cassandra row", e);
-
-                    throw new IgniteException("Failed to build Ignite key object from provided Cassandra row", e);
-                }
-
-                try {
-                    val = (V)ctrl.buildValueObject(row);
-                }
-                catch (Throwable e) {
-                    log.error("Failed to build Ignite value object from provided Cassandra row", e);
-
-                    throw new IgniteException("Failed to build Ignite value object from provided Cassandra row", e);
-                }
-
-                clo.apply(key, val);
-            }
-        });
-
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java
deleted file mode 100644
index ecbbe78..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains classes responsible for handling sessions and communication with Cassandra
- */
-package org.apache.ignite.cache.store.cassandra.session;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
deleted file mode 100644
index fc4a907..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.session.pool;
-
-import com.datastax.driver.core.Session;
-import java.lang.Thread.State;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
-
-/**
- * Cassandra driver sessions pool.
- */
-public class SessionPool {
-    /**
-     * Monitors session pool and closes unused session.
-     */
-    private static class SessionMonitor extends Thread {
-        /** {@inheritDoc} */
-        @Override public void run() {
-            try {
-                while (true) {
-                    try {
-                        Thread.sleep(SLEEP_TIMEOUT);
-                    }
-                    catch (InterruptedException ignored) {
-                        return;
-                    }
-
-                    List<Map.Entry<CassandraSessionImpl, SessionWrapper>> expiredSessions = new LinkedList<>();
-
-                    int sessionsCnt;
-
-                    synchronized (sessions) {
-                        sessionsCnt = sessions.size();
-
-                        for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : sessions.entrySet()) {
-                            if (entry.getValue().expired())
-                                expiredSessions.add(entry);
-                        }
-
-                        for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : expiredSessions)
-                            sessions.remove(entry.getKey());
-                    }
-
-                    for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : expiredSessions)
-                        entry.getValue().release();
-
-                    // all sessions in the pool expired, thus we don't need additional thread to manage sessions in the pool
-                    if (sessionsCnt == expiredSessions.size())
-                        return;
-                }
-            }
-            finally {
-                release();
-            }
-        }
-    }
-
-    /** Sessions monitor sleep timeout. */
-    private static final long SLEEP_TIMEOUT = 60000; // 1 minute.
-
-    /** Sessions which were returned to pool. */
-    private static final Map<CassandraSessionImpl, SessionWrapper> sessions = new HashMap<>();
-
-    /** Singleton instance. */
-    private static SessionMonitor monitorSingleton;
-
-    static {
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override public void run() {
-                release();
-            }
-        });
-    }
-
-    /**
-     * Returns Cassandra driver session to sessions pool.
-     *
-     * @param cassandraSes Session wrapper.
-     * @param driverSes Driver session.
-     */
-    public static void put(CassandraSessionImpl cassandraSes, Session driverSes) {
-        if (cassandraSes == null || driverSes == null)
-            return;
-
-        SessionWrapper old;
-
-        synchronized (sessions) {
-            old = sessions.put(cassandraSes, new SessionWrapper(driverSes));
-
-            if (monitorSingleton == null || State.TERMINATED.equals(monitorSingleton.getState())) {
-                monitorSingleton = new SessionMonitor();
-                monitorSingleton.setDaemon(true);
-                monitorSingleton.setName("Cassandra-sessions-pool");
-                monitorSingleton.start();
-            }
-        }
-
-        if (old != null)
-            old.release();
-    }
-
-    /**
-     * Extracts Cassandra driver session from pool.
-     *
-     * @param cassandraSes Session wrapper.
-     * @return Cassandra driver session.
-     */
-    public static Session get(CassandraSessionImpl cassandraSes) {
-        if (cassandraSes == null)
-            return null;
-
-        SessionWrapper wrapper;
-
-        synchronized (sessions) {
-            wrapper = sessions.remove(cassandraSes);
-        }
-
-        return wrapper == null ? null : wrapper.driverSession();
-    }
-
-    /**
-     * Releases all session from pool and closes all their connections to Cassandra database.
-     */
-    public static void release() {
-        Collection<SessionWrapper> wrappers;
-
-        synchronized (sessions) {
-            try {
-                if (sessions.size() == 0)
-                    return;
-
-                wrappers = new LinkedList<>();
-
-                for (SessionWrapper wrapper : sessions.values())
-                    wrappers.add(wrapper);
-
-                sessions.clear();
-            }
-            finally {
-                if (!(Thread.currentThread() instanceof SessionMonitor) && monitorSingleton != null) {
-                    try {
-                        monitorSingleton.interrupt();
-                    }
-                    catch (Throwable ignored) {
-                    }
-                }
-            }
-        }
-
-        for (SessionWrapper wrapper : wrappers)
-            wrapper.release();
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
deleted file mode 100644
index 7c5722b..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.session.pool;
-
-import com.datastax.driver.core.Session;
-import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
-
-/**
- * Wrapper for Cassandra driver session, responsible for monitoring session expiration and its closing.
- */
-public class SessionWrapper {
-    /** Expiration timeout for Cassandra driver session. */
-    public static final long DFLT_EXPIRATION_TIMEOUT = 300000;  // 5 minutes.
-
-    /** Cassandra driver session. */
-    private Session ses;
-
-    /** Wrapper creation time.  */
-    private long time;
-
-    /**
-     * Creates instance of Cassandra driver session wrapper.
-     *
-     * @param ses Cassandra driver session.
-     */
-    public SessionWrapper(Session ses) {
-        this.ses = ses;
-        this.time = System.currentTimeMillis();
-    }
-
-    /**
-     * Checks if Cassandra driver session expired.
-     *
-     * @return true if session expired.
-     */
-    public boolean expired() {
-        return System.currentTimeMillis() - time > DFLT_EXPIRATION_TIMEOUT;
-    }
-
-    /**
-     * Returns wrapped Cassandra driver session.
-     *
-     * @return Cassandra driver session.
-     */
-    public Session driverSession() {
-        return ses;
-    }
-
-    /**
-     * Closes wrapped Cassandra driver session
-     */
-    public void release() {
-        CassandraHelper.closeSession(ses);
-        ses = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java
deleted file mode 100644
index 21c292f..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains session pool implenetation for Cassandra sessions
- */
-package org.apache.ignite.cache.store.cassandra.session.pool;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java
deleted file mode 100644
index 4f40478..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.utils;
-
-import java.io.File;
-import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
-
-/**
- * Generates Cassandra DDL statements from persistence descriptor xml file.
- */
-public class DDLGenerator {
-    /**
-     * DDLGenerator entry point.
-     *
-     * @param args Arguments for DDLGenerator.
-     */
-    public static void main(String[] args) {
-        if (args == null || args.length == 0)
-            return;
-
-        for (String arg : args) {
-            File file = new File(arg);
-            if (!file.isFile()) {
-                System.out.println("-------------------------------------------------------------");
-                System.out.println("Incorrect file specified: " + arg);
-                System.out.println("-------------------------------------------------------------");
-                continue;
-            }
-
-            try {
-                KeyValuePersistenceSettings settings = new KeyValuePersistenceSettings(file);
-                System.out.println("-------------------------------------------------------------");
-                System.out.println("DDL for keyspace/table from file: " + arg);
-                System.out.println("-------------------------------------------------------------");
-                System.out.println();
-                System.out.println(settings.getKeyspaceDDLStatement());
-                System.out.println();
-                System.out.println(settings.getTableDDLStatement());
-                System.out.println();
-            }
-            catch (Throwable e) {
-                System.out.println("-------------------------------------------------------------");
-                System.out.println("Incorrect file specified: " + arg);
-                System.out.println("-------------------------------------------------------------");
-                e.printStackTrace();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java
deleted file mode 100644
index 2460dfe..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains utility classes
- */
-package org.apache.ignite.cache.store.cassandra.utils;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/test/bootstrap/aws/README.txt
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/bootstrap/aws/README.txt b/modules/cassandra/src/test/bootstrap/aws/README.txt
deleted file mode 100644
index 4457d81..0000000
--- a/modules/cassandra/src/test/bootstrap/aws/README.txt
+++ /dev/null
@@ -1,13 +0,0 @@
-Shell scripts to spin up Ignite, Cassandra and Load tests clusters in AWS.
-
-1) cassandra - bootstrap scripts for Cassandra cluster nodes
-2) ganglia - bootstrap scripts for Ganglia master and agents
-3) ignite - bootstrap scripts for Ignite cluster nodes
-4) tests - bootstrap scripts for Load Tests cluster nodes
-5) common.sh - definitions for common functions
-6) env.sh - definitions for common variables
-7) log-collector.sh - log collector daemon script, to collect logs and upload them to S3
-
-For more details please look at the documentation:
-
-    https://apacheignite.readme.io/docs/aws-infrastructure-deployment
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh b/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh
deleted file mode 100644
index 017b1b1..0000000
--- a/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh
+++ /dev/null
@@ -1,336 +0,0 @@
-#!/bin/sh
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# -----------------------------------------------------------------------------------------------
-# Bootstrap script to spin up Cassandra cluster
-# -----------------------------------------------------------------------------------------------
-
-# URL to download AWS CLI tools
-AWS_CLI_DOWNLOAD_URL=https://s3.amazonaws.com/aws-cli/awscli-bundle.zip
-
-# URL to download JDK
-JDK_DOWNLOAD_URL=http://download.oracle.com/otn-pub/java/jdk/8u77-b03/jdk-8u77-linux-x64.tar.gz
-
-# URL to download Ignite-Cassandra tests package - you should previously package and upload it to this place
-TESTS_PACKAGE_DONLOAD_URL=s3://<bucket>/<folder>/ignite-cassandra-tests-<version>.zip
-
-# Terminates script execution and upload logs to S3
-terminate()
-{
-    SUCCESS_URL=$S3_CASSANDRA_BOOTSTRAP_SUCCESS
-    FAILURE_URL=$S3_CASSANDRA_BOOTSTRAP_FAILURE
-
-    if [ -n "$SUCCESS_URL" ] && [[ "$SUCCESS_URL" != */ ]]; then
-        SUCCESS_URL=${SUCCESS_URL}/
-    fi
-
-    if [ -n "$FAILURE_URL" ] && [[ "$FAILURE_URL" != */ ]]; then
-        FAILURE_URL=${FAILURE_URL}/
-    fi
-
-    host_name=$(hostname -f | tr '[:upper:]' '[:lower:]')
-    msg=$host_name
-
-    if [ -n "$1" ]; then
-        echo "[ERROR] $1"
-        echo "[ERROR]-----------------------------------------------------"
-        echo "[ERROR] Cassandra node bootstrap failed"
-        echo "[ERROR]-----------------------------------------------------"
-        msg=$1
-
-        if [ -z "$FAILURE_URL" ]; then
-            exit 1
-        fi
-
-        reportFolder=${FAILURE_URL}${host_name}
-        reportFile=$reportFolder/__error__
-    else
-        echo "[INFO]-----------------------------------------------------"
-        echo "[INFO] Cassandra node bootstrap successfully completed"
-        echo "[INFO]-----------------------------------------------------"
-
-        if [ -z "$SUCCESS_URL" ]; then
-            exit 0
-        fi
-
-        reportFolder=${SUCCESS_URL}${host_name}
-        reportFile=$reportFolder/__success__
-    fi
-
-    echo $msg > /opt/bootstrap-result
-
-    aws s3 rm --recursive $reportFolder
-    if [ $? -ne 0 ]; then
-        echo "[ERROR] Failed to drop report folder: $reportFolder"
-    fi
-
-    aws s3 cp --sse AES256 /opt/bootstrap-result $reportFile
-    if [ $? -ne 0 ]; then
-        echo "[ERROR] Failed to report bootstrap result to: $reportFile"
-    fi
-
-    rm -f /opt/bootstrap-result
-
-    if [ -n "$1" ]; then
-        exit 1
-    fi
-
-    exit 0
-}
-
-# Downloads specified package
-downloadPackage()
-{
-    echo "[INFO] Downloading $3 package from $1 into $2"
-
-    for i in 0 9;
-    do
-        if [[ "$1" == s3* ]]; then
-            aws s3 cp $1 $2
-            code=$?
-        else
-            curl "$1" -o "$2"
-            code=$?
-        fi
-
-        if [ $code -eq 0 ]; then
-            echo "[INFO] $3 package successfully downloaded from $1 into $2"
-            return 0
-        fi
-
-        echo "[WARN] Failed to download $3 package from $i attempt, sleeping extra 5sec"
-        sleep 5s
-    done
-
-    terminate "All 10 attempts to download $3 package from $1 are failed"
-}
-
-# Downloads and setup JDK
-setupJava()
-{
-    rm -Rf /opt/java /opt/jdk.tar.gz
-
-    echo "[INFO] Downloading 'jdk'"
-    wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" "$JDK_DOWNLOAD_URL" -O /opt/jdk.tar.gz
-    if [ $? -ne 0 ]; then
-        terminate "Failed to download 'jdk'"
-    fi
-
-    echo "[INFO] Untaring 'jdk'"
-    tar -xvzf /opt/jdk.tar.gz -C /opt
-    if [ $? -ne 0 ]; then
-        terminate "Failed to untar 'jdk'"
-    fi
-
-    rm -Rf /opt/jdk.tar.gz
-
-    unzipDir=$(ls /opt | grep "jdk")
-    if [ "$unzipDir" != "java" ]; then
-        mv /opt/$unzipDir /opt/java
-    fi
-}
-
-# Downloads and setup AWS CLI
-setupAWSCLI()
-{
-    echo "[INFO] Installing 'awscli'"
-    pip install --upgrade awscli
-    if [ $? -eq 0 ]; then
-        return 0
-    fi
-
-    echo "[ERROR] Failed to install 'awscli' using pip"
-    echo "[INFO] Trying to install awscli using zip archive"
-    echo "[INFO] Downloading awscli zip"
-
-    downloadPackage "$AWS_CLI_DOWNLOAD_URL" "/opt/awscli-bundle.zip" "awscli"
-
-    echo "[INFO] Unzipping awscli zip"
-    unzip /opt/awscli-bundle.zip -d /opt
-    if [ $? -ne 0 ]; then
-        terminate "Failed to unzip awscli zip"
-    fi
-
-    rm -Rf /opt/awscli-bundle.zip
-
-    echo "[INFO] Installing awscli"
-    /opt/awscli-bundle/install -i /usr/local/aws -b /usr/local/bin/aws
-    if [ $? -ne 0 ]; then
-        terminate "Failed to install awscli"
-    fi
-
-    echo "[INFO] Successfully installed awscli from zip archive"
-}
-
-# Setup all the pre-requisites (packages, settings and etc.)
-setupPreRequisites()
-{
-    echo "[INFO] Installing 'wget' package"
-    yum -y install wget
-    if [ $? -ne 0 ]; then
-        terminate "Failed to install 'wget' package"
-    fi
-
-    echo "[INFO] Installing 'net-tools' package"
-    yum -y install net-tools
-    if [ $? -ne 0 ]; then
-        terminate "Failed to install 'net-tools' package"
-    fi
-
-    echo "[INFO] Installing 'python' package"
-    yum -y install python
-    if [ $? -ne 0 ]; then
-        terminate "Failed to install 'python' package"
-    fi
-
-    echo "[INFO] Installing 'unzip' package"
-    yum -y install unzip
-    if [ $? -ne 0 ]; then
-        terminate "Failed to install 'unzip' package"
-    fi
-
-    downloadPackage "https://bootstrap.pypa.io/get-pip.py" "/opt/get-pip.py" "get-pip.py"
-
-    echo "[INFO] Installing 'pip'"
-    python /opt/get-pip.py
-    if [ $? -ne 0 ]; then
-        terminate "Failed to install 'pip'"
-    fi
-}
-
-# Downloads and setup tests package
-setupTestsPackage()
-{
-    downloadPackage "$TESTS_PACKAGE_DONLOAD_URL" "/opt/ignite-cassandra-tests.zip" "Tests"
-
-    rm -Rf /opt/ignite-cassandra-tests
-
-    unzip /opt/ignite-cassandra-tests.zip -d /opt
-    if [ $? -ne 0 ]; then
-        terminate "Failed to unzip tests package"
-    fi
-
-    rm -f /opt/ignite-cassandra-tests.zip
-
-    unzipDir=$(ls /opt | grep "ignite-cassandra")
-    if [ "$unzipDir" != "ignite-cassandra-tests" ]; then
-        mv /opt/$unzipDir /opt/ignite-cassandra-tests
-    fi
-
-    find /opt/ignite-cassandra-tests -type f -name "*.sh" -exec chmod ug+x {} \;
-
-    . /opt/ignite-cassandra-tests/bootstrap/aws/common.sh "cassandra"
-
-    setupNTP
-
-    echo "[INFO] Starting logs collector daemon"
-
-    HOST_NAME=$(hostname -f | tr '[:upper:]' '[:lower:]')
-    /opt/ignite-cassandra-tests/bootstrap/aws/logs-collector.sh "$S3_LOGS_TRIGGER" "$S3_CASSANDRA_LOGS/$HOST_NAME" "/opt/cassandra/logs" "/opt/cassandra/cassandra-start.log" > /opt/logs-collector.log &
-
-    echo "[INFO] Logs collector daemon started: $!"
-
-    echo "----------------------------------------------------------------------------------------"
-    printInstanceInfo
-    echo "----------------------------------------------------------------------------------------"
-    tagInstance
-    bootstrapGangliaAgent "cassandra" 8641
-}
-
-# Downloads Cassandra package
-downloadCassandra()
-{
-    downloadPackage "$CASSANDRA_DOWNLOAD_URL" "/opt/apache-cassandra.tar.gz" "Cassandra"
-
-    rm -Rf /opt/cassandra
-
-    echo "[INFO] Untaring Cassandra package"
-    tar -xvzf /opt/apache-cassandra.tar.gz -C /opt
-    if [ $? -ne 0 ]; then
-        terminate "Failed to untar Cassandra package"
-    fi
-
-    rm -f /opt/apache-cassandra.tar.gz
-
-    unzipDir=$(ls /opt | grep "cassandra" | grep "apache")
-    if [ "$unzipDir" != "cassandra" ]; then
-        mv /opt/$unzipDir /opt/cassandra
-    fi
-}
-
-# Setups Cassandra
-setupCassandra()
-{
-    echo "[INFO] Creating 'cassandra' group"
-    exists=$(cat /etc/group | grep cassandra)
-    if [ -z "$exists" ]; then
-        groupadd cassandra
-        if [ $? -ne 0 ]; then
-            terminate "Failed to create 'cassandra' group"
-        fi
-    fi
-
-    echo "[INFO] Creating 'cassandra' user"
-    exists=$(cat /etc/passwd | grep cassandra)
-    if [ -z "$exists" ]; then
-        useradd -g cassandra cassandra
-        if [ $? -ne 0 ]; then
-            terminate "Failed to create 'cassandra' user"
-        fi
-    fi
-
-    rm -f /opt/cassandra/conf/cassandra-env.sh /opt/cassandra/conf/cassandra-template.yaml
-
-    cp /opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-env.sh /opt/cassandra/conf
-    cp /opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-template.yaml /opt/cassandra/conf
-
-    chown -R cassandra:cassandra /opt/cassandra /opt/ignite-cassandra-tests
-
-    createCassandraStorageLayout
-
-    cat /opt/cassandra/conf/cassandra-template.yaml | sed -r "s/\\\$\{CASSANDRA_DATA_DIR\}/$CASSANDRA_DATA_DIR/g" > /opt/cassandra/conf/cassandra-template-1.yaml
-    cat /opt/cassandra/conf/cassandra-template-1.yaml | sed -r "s/\\\$\{CASSANDRA_COMMITLOG_DIR\}/$CASSANDRA_COMMITLOG_DIR/g" > /opt/cassandra/conf/cassandra-template-2.yaml
-    cat /opt/cassandra/conf/cassandra-template-2.yaml | sed -r "s/\\\$\{CASSANDRA_CACHES_DIR\}/$CASSANDRA_CACHES_DIR/g" > /opt/cassandra/conf/cassandra-template-3.yaml
-
-    rm -f /opt/cassandra/conf/cassandra-template.yaml /opt/cassandra/conf/cassandra-template-1.yaml /opt/cassandra/conf/cassandra-template-2.yaml
-    mv /opt/cassandra/conf/cassandra-template-3.yaml /opt/cassandra/conf/cassandra-template.yaml
-
-    echo "export JAVA_HOME=/opt/java" >> $1
-    echo "export CASSANDRA_HOME=/opt/cassandra" >> $1
-    echo "export PATH=\$JAVA_HOME/bin:\$CASSANDRA_HOME/bin:\$PATH" >> $1
-}
-
-###################################################################################################################
-
-echo "[INFO]-----------------------------------------------------------------"
-echo "[INFO] Bootstrapping Cassandra node"
-echo "[INFO]-----------------------------------------------------------------"
-
-setupPreRequisites
-setupJava
-setupAWSCLI
-setupTestsPackage
-downloadCassandra
-setupCassandra "/root/.bash_profile"
-
-cmd="/opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-start.sh"
-
-#sudo -u cassandra -g cassandra sh -c "$cmd | tee /opt/cassandra/cassandra-start.log"
-
-$cmd | tee /opt/cassandra/cassandra-start.log
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-env.sh
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-env.sh b/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-env.sh
deleted file mode 100644
index ba76401..0000000
--- a/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-env.sh
+++ /dev/null
@@ -1,287 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# -----------------------------------------------------------------------------------------------
-# Environment setup script from Cassandra distribution
-# -----------------------------------------------------------------------------------------------
-
-calculate_heap_sizes()
-{
-    case "`uname`" in
-        Linux)
-            system_memory_in_mb=`free -m | awk '/:/ {print $2;exit}'`
-            system_cpu_cores=`egrep -c 'processor([[:space:]]+):.*' /proc/cpuinfo`
-        ;;
-        FreeBSD)
-            system_memory_in_bytes=`sysctl hw.physmem | awk '{print $2}'`
-            system_memory_in_mb=`expr $system_memory_in_bytes / 1024 / 1024`
-            system_cpu_cores=`sysctl hw.ncpu | awk '{print $2}'`
-        ;;
-        SunOS)
-            system_memory_in_mb=`prtconf | awk '/Memory size:/ {print $3}'`
-            system_cpu_cores=`psrinfo | wc -l`
-        ;;
-        Darwin)
-            system_memory_in_bytes=`sysctl hw.memsize | awk '{print $2}'`
-            system_memory_in_mb=`expr $system_memory_in_bytes / 1024 / 1024`
-            system_cpu_cores=`sysctl hw.ncpu | awk '{print $2}'`
-        ;;
-        *)
-            # assume reasonable defaults for e.g. a modern desktop or
-            # cheap server
-            system_memory_in_mb="2048"
-            system_cpu_cores="2"
-        ;;
-    esac
-
-    # some systems like the raspberry pi don't report cores, use at least 1
-    if [ "$system_cpu_cores" -lt "1" ]
-    then
-        system_cpu_cores="1"
-    fi
-
-    # set max heap size based on the following
-    # max(min(1/2 ram, 1024MB), min(1/4 ram, 8GB))
-    # calculate 1/2 ram and cap to 1024MB
-    # calculate 1/4 ram and cap to 8192MB
-    # pick the max
-    half_system_memory_in_mb=`expr $system_memory_in_mb / 2`
-    quarter_system_memory_in_mb=`expr $half_system_memory_in_mb / 2`
-    if [ "$half_system_memory_in_mb" -gt "1024" ]
-    then
-        half_system_memory_in_mb="1024"
-    fi
-    if [ "$quarter_system_memory_in_mb" -gt "8192" ]
-    then
-        quarter_system_memory_in_mb="8192"
-    fi
-    if [ "$half_system_memory_in_mb" -gt "$quarter_system_memory_in_mb" ]
-    then
-        max_heap_size_in_mb="$half_system_memory_in_mb"
-    else
-        max_heap_size_in_mb="$quarter_system_memory_in_mb"
-    fi
-    MAX_HEAP_SIZE="${max_heap_size_in_mb}M"
-
-    # Young gen: min(max_sensible_per_modern_cpu_core * num_cores, 1/4 * heap size)
-    max_sensible_yg_per_core_in_mb="100"
-    max_sensible_yg_in_mb=`expr $max_sensible_yg_per_core_in_mb "*" $system_cpu_cores`
-
-    desired_yg_in_mb=`expr $max_heap_size_in_mb / 4`
-
-    if [ "$desired_yg_in_mb" -gt "$max_sensible_yg_in_mb" ]
-    then
-        HEAP_NEWSIZE="${max_sensible_yg_in_mb}M"
-    else
-        HEAP_NEWSIZE="${desired_yg_in_mb}M"
-    fi
-}
-
-# Determine the sort of JVM we'll be running on.
-java_ver_output=`"${JAVA:-java}" -version 2>&1`
-jvmver=`echo "$java_ver_output" | grep '[openjdk|java] version' | awk -F'"' 'NR==1 {print $2}'`
-JVM_VERSION=${jvmver%_*}
-JVM_PATCH_VERSION=${jvmver#*_}
-
-if [ "$JVM_VERSION" \< "1.8" ] ; then
-    echo "Cassandra 3.0 and later require Java 8u40 or later."
-    exit 1;
-fi
-
-if [ "$JVM_VERSION" \< "1.8" ] && [ "$JVM_PATCH_VERSION" \< "40" ] ; then
-    echo "Cassandra 3.0 and later require Java 8u40 or later."
-    exit 1;
-fi
-
-jvm=`echo "$java_ver_output" | grep -A 1 'java version' | awk 'NR==2 {print $1}'`
-case "$jvm" in
-    OpenJDK)
-        JVM_VENDOR=OpenJDK
-        # this will be "64-Bit" or "32-Bit"
-        JVM_ARCH=`echo "$java_ver_output" | awk 'NR==3 {print $2}'`
-        ;;
-    "Java(TM)")
-        JVM_VENDOR=Oracle
-        # this will be "64-Bit" or "32-Bit"
-        JVM_ARCH=`echo "$java_ver_output" | awk 'NR==3 {print $3}'`
-        ;;
-    *)
-        # Help fill in other JVM values
-        JVM_VENDOR=other
-        JVM_ARCH=unknown
-        ;;
-esac
-
-# Override these to set the amount of memory to allocate to the JVM at
-# start-up. For production use you may wish to adjust this for your
-# environment. MAX_HEAP_SIZE is the total amount of memory dedicated
-# to the Java heap. HEAP_NEWSIZE refers to the size of the young
-# generation. Both MAX_HEAP_SIZE and HEAP_NEWSIZE should be either set
-# or not (if you set one, set the other).
-#
-# The main trade-off for the young generation is that the larger it
-# is, the longer GC pause times will be. The shorter it is, the more
-# expensive GC will be (usually).
-#
-# The example HEAP_NEWSIZE assumes a modern 8-core+ machine for decent pause
-# times. If in doubt, and if you do not particularly want to tweak, go with
-# 100 MB per physical CPU core.
-
-#MAX_HEAP_SIZE="4G"
-#HEAP_NEWSIZE="800M"
-
-# Set this to control the amount of arenas per-thread in glibc
-#export MALLOC_ARENA_MAX=4
-
-# only calculate the size if it's not set manually
-if [ "x$MAX_HEAP_SIZE" = "x" ] && [ "x$HEAP_NEWSIZE" = "x" ]; then
-    calculate_heap_sizes
-else
-    if [ "x$MAX_HEAP_SIZE" = "x" ] ||  [ "x$HEAP_NEWSIZE" = "x" ]; then
-        echo "please set or unset MAX_HEAP_SIZE and HEAP_NEWSIZE in pairs (see cassandra-env.sh)"
-        exit 1
-    fi
-fi
-
-if [ "x$MALLOC_ARENA_MAX" = "x" ] ; then
-    export MALLOC_ARENA_MAX=4
-fi
-
-#GC log path has to be defined here because it needs to access CASSANDRA_HOME
-JVM_OPTS="$JVM_OPTS -Xloggc:${CASSANDRA_HOME}/logs/gc.log"
-
-# Here we create the arguments that will get passed to the jvm when
-# starting cassandra.
-
-# Read user-defined JVM options from jvm.options file
-JVM_OPTS_FILE=$CASSANDRA_CONF/jvm.options
-for opt in `grep "^-" $JVM_OPTS_FILE`
-do
-  JVM_OPTS="$JVM_OPTS $opt"
-done
-
-# Check what parameters were defined on jvm.options file to avoid conflicts
-echo $JVM_OPTS | grep -q Xmn
-DEFINED_XMN=$?
-echo $JVM_OPTS | grep -q Xmx
-DEFINED_XMX=$?
-echo $JVM_OPTS | grep -q Xms
-DEFINED_XMS=$?
-echo $JVM_OPTS | grep -q UseConcMarkSweepGC
-USING_CMS=$?
-
-# We only set -Xms and -Xmx if they were not defined on jvm.options file
-# If defined, both Xmx and Xms should be defined together.
-if [ $DEFINED_XMX -ne 0 ] && [ $DEFINED_XMS -ne 0 ]; then
-     JVM_OPTS="$JVM_OPTS -Xms${MAX_HEAP_SIZE}"
-     JVM_OPTS="$JVM_OPTS -Xmx${MAX_HEAP_SIZE}"
-elif [ $DEFINED_XMX -ne 0 ] || [ $DEFINED_XMS -ne 0 ]; then
-     echo "Please set or unset -Xmx and -Xms flags in pairs on jvm.options file."
-     exit 1
-fi
-
-# We only set -Xmn flag if it was not defined in jvm.options file
-# and if the CMS GC is being used
-# If defined, both Xmn and Xmx should be defined together.
-if [ $DEFINED_XMN -eq 0 ] && [ $DEFINED_XMX -ne 0 ]; then
-    echo "Please set or unset -Xmx and -Xmn flags in pairs on jvm.options file."
-    exit 1
-elif [ $DEFINED_XMN -ne 0 ] && [ $USING_CMS -eq 0 ]; then
-    JVM_OPTS="$JVM_OPTS -Xmn${HEAP_NEWSIZE}"
-fi
-
-if [ "$JVM_ARCH" = "64-Bit" ] && [ $USING_CMS -eq 0 ]; then
-    JVM_OPTS="$JVM_OPTS -XX:+UseCondCardMark"
-fi
-
-# provides hints to the JIT compiler
-JVM_OPTS="$JVM_OPTS -XX:CompileCommandFile=$CASSANDRA_CONF/hotspot_compiler"
-
-# add the jamm javaagent
-JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.3.0.jar"
-
-# set jvm HeapDumpPath with CASSANDRA_HEAPDUMP_DIR
-if [ "x$CASSANDRA_HEAPDUMP_DIR" != "x" ]; then
-    JVM_OPTS="$JVM_OPTS -XX:HeapDumpPath=$CASSANDRA_HEAPDUMP_DIR/cassandra-`date +%s`-pid$$.hprof"
-fi
-
-# jmx: metrics and administration interface
-#
-# add this if you're having trouble connecting:
-# JVM_OPTS="$JVM_OPTS -Djava.rmi.server.hostname=<public name>"
-#
-# see
-# https://blogs.oracle.com/jmxetc/entry/troubleshooting_connection_problems_in_jconsole
-# for more on configuring JMX through firewalls, etc. (Short version:
-# get it working with no firewall first.)
-#
-# Cassandra ships with JMX accessible *only* from localhost.  
-# To enable remote JMX connections, uncomment lines below
-# with authentication and/or ssl enabled. See https://wiki.apache.org/cassandra/JmxSecurity 
-#
-if [ "x$LOCAL_JMX" = "x" ]; then
-    LOCAL_JMX=yes
-fi
-
-# Specifies the default port over which Cassandra will be available for
-# JMX connections.
-# For security reasons, you should not expose this port to the internet.  Firewall it if needed.
-JMX_PORT="7199"
-
-if [ "$LOCAL_JMX" = "yes" ]; then
-#  JVM_OPTS="$JVM_OPTS -Dcassandra.jmx.local.port=$JMX_PORT -XX:+DisableExplicitGC"
-  JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote"
-  JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT"
-  JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT"
-  JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.local.only=false"
-  JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false"
-  JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl=false"
-  JVM_OPTS="$JVM_OPTS -XX:+UnlockCommercialFeatures"
-  JVM_OPTS="$JVM_OPTS -XX:+FlightRecorder"
-  JVM_OPTS="$JVM_OPTS -XX:FlightRecorderOptions=defaultrecording=true"
-else
-  JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT"
-  JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT"
-  JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl=false"
-  JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=true"
-  JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.password.file=/etc/cassandra/jmxremote.password"
-#  JVM_OPTS="$JVM_OPTS -Djavax.net.ssl.keyStore=/path/to/keystore"
-#  JVM_OPTS="$JVM_OPTS -Djavax.net.ssl.keyStorePassword=<keystore-password>"
-#  JVM_OPTS="$JVM_OPTS -Djavax.net.ssl.trustStore=/path/to/truststore"
-#  JVM_OPTS="$JVM_OPTS -Djavax.net.ssl.trustStorePassword=<truststore-password>"
-#  JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl.need.client.auth=true"
-#  JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.registry.ssl=true"
-#  JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl.enabled.protocols=<enabled-protocols>"
-#  JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl.enabled.cipher.suites=<enabled-cipher-suites>"
-fi
-
-# To use mx4j, an HTML interface for JMX, add mx4j-tools.jar to the lib/
-# directory.
-# See http://wiki.apache.org/cassandra/Operations#Monitoring_with_MX4J
-# By default mx4j listens on 0.0.0.0:8081. Uncomment the following lines
-# to control its listen address and port.
-#MX4J_ADDRESS="-Dmx4jaddress=127.0.0.1"
-#MX4J_PORT="-Dmx4jport=8081"
-
-# Cassandra uses SIGAR to capture OS metrics CASSANDRA-7838
-# for SIGAR we have to set the java.library.path
-# to the location of the native libraries.
-JVM_OPTS="$JVM_OPTS -Djava.library.path=$CASSANDRA_HOME/lib/sigar-bin"
-
-JVM_OPTS="$JVM_OPTS $MX4J_ADDRESS"
-JVM_OPTS="$JVM_OPTS $MX4J_PORT"
-JVM_OPTS="$JVM_OPTS $JVM_EXTRA_OPTS"


Mime
View raw message