ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [24/41] ignite git commit: IGNITE-1371 Implemented Cassandra cache store.
Date Fri, 17 Jun 2016 08:54:33 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java
new file mode 100644
index 0000000..4e86d74
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import java.beans.PropertyDescriptor;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.w3c.dom.Element;
+
+/**
+ * Descriptor for Ignite key POJO class
+ */
+public class PojoKeyField extends PojoField {
+
+    /**
+     * Specifies sort order for POJO key field
+     */
+    public enum SortOrder {
+        /** Ascending sort order. */
+        ASC,
+        /** Descending sort order. */
+        DESC
+    }
+
+    /** Xml attribute specifying sort order. */
+    private static final String SORT_ATTR = "sort";
+
+    /** Sort order. */
+    private SortOrder sortOrder = null;
+
+    /**
+     * Constructs Ignite cache key POJO object descriptor.
+     *
+     * @param el xml configuration element.
+     * @param pojoCls java class of key POJO field.
+     */
+    public PojoKeyField(Element el, Class pojoCls) {
+        super(el, pojoCls);
+
+        if (el.hasAttribute(SORT_ATTR)) {
+            try {
+                sortOrder = SortOrder.valueOf(el.getAttribute(SORT_ATTR).trim().toUpperCase());
+            }
+            catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException("Incorrect sort order '" + el.getAttribute(SORT_ATTR) + "' specified");
+            }
+        }
+    }
+
+    /**
+     * Constructs Ignite cache key POJO object descriptor.
+     *
+     * @param desc property descriptor.
+     */
+    public PojoKeyField(PropertyDescriptor desc) {
+        super(desc);
+    }
+
+    /**
+     * Returns sort order for the field.
+     *
+     * @return sort order.
+     */
+    public SortOrder getSortOrder() {
+        return sortOrder;
+    }
+
+    /**
+     * Initializes descriptor from {@link QuerySqlField} annotation.
+     *
+     * @param sqlField {@link QuerySqlField} annotation.
+     */
+    protected void init(QuerySqlField sqlField) {
+        if (sqlField.descending())
+            sortOrder = SortOrder.DESC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
new file mode 100644
index 0000000..c29f1db
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import java.beans.PropertyDescriptor;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.w3c.dom.Element;
+
+/**
+ * Descriptor for Ignite value POJO class
+ */
+public class PojoValueField extends PojoField {
+    /** Xml attribute specifying that Cassandra column is static. */
+    private static final String STATIC_ATTR = "static";
+
+    /** Xml attribute specifying that secondary index should be created for Cassandra column. */
+    private static final String INDEX_ATTR = "index";
+
+    /** Xml attribute specifying secondary index custom class. */
+    private static final String INDEX_CLASS_ATTR = "indexClass";
+
+    /** Xml attribute specifying secondary index options. */
+    private static final String INDEX_OPTIONS_ATTR = "indexOptions";
+
+    /** Indicates if Cassandra column should be indexed. */
+    private Boolean isIndexed;
+
+    /** Custom java class for Cassandra secondary index. */
+    private String idxCls;
+
+    /** Secondary index options. */
+    private String idxOptions;
+
+    /** Indicates if Cassandra column is static. */
+    private Boolean isStatic;
+
+    /**
+     * Constructs Ignite cache value field descriptor.
+     *
+     * @param el field descriptor xml configuration element.
+     * @param pojoCls field java class
+     */
+    public PojoValueField(Element el, Class pojoCls) {
+        super(el, pojoCls);
+
+        if (el.hasAttribute(STATIC_ATTR))
+            isStatic = Boolean.parseBoolean(el.getAttribute(STATIC_ATTR).trim().toLowerCase());
+
+        if (el.hasAttribute(INDEX_ATTR))
+            isIndexed = Boolean.parseBoolean(el.getAttribute(INDEX_ATTR).trim().toLowerCase());
+
+        if (el.hasAttribute(INDEX_CLASS_ATTR))
+            idxCls = el.getAttribute(INDEX_CLASS_ATTR).trim();
+
+        if (el.hasAttribute(INDEX_OPTIONS_ATTR)) {
+            idxOptions = el.getAttribute(INDEX_OPTIONS_ATTR).trim();
+
+            if (!idxOptions.toLowerCase().startsWith("with")) {
+                idxOptions = idxOptions.toLowerCase().startsWith("options") ?
+                    "with " + idxOptions :
+                    "with options = " + idxOptions;
+            }
+        }
+    }
+
+    /**
+     * Constructs Ignite cache value field descriptor.
+     *
+     * @param desc field property descriptor.
+     */
+    public PojoValueField(PropertyDescriptor desc) {
+        super(desc);
+    }
+
+    /**
+     * Returns DDL for Cassandra columns corresponding to POJO field.
+     *
+     * @return columns DDL.
+     */
+    public String getColumnDDL() {
+        String colDDL = super.getColumnDDL();
+
+        if (isStatic != null && isStatic)
+            colDDL = colDDL + " static";
+
+        return colDDL;
+    }
+
+    /**
+     * Indicates if secondary index should be created for the field.
+     *
+     * @return true/false if secondary index should/shouldn't be created for the field.
+     */
+    public boolean isIndexed() {
+        return isIndexed != null && isIndexed;
+    }
+
+    /**
+     * Returns DDL for the field secondary index.
+     *
+     * @param keyspace Cassandra keyspace where index should be created.
+     * @param tbl Cassandra table for which secondary index should be created.
+     *
+     * @return secondary index DDL.
+     */
+    public String getIndexDDL(String keyspace, String tbl) {
+        if (isIndexed == null || !isIndexed)
+            return null;
+
+        StringBuilder builder = new StringBuilder();
+
+        if (idxCls != null)
+            builder.append("create custom index if not exists on ").append(keyspace).append(".").append(tbl);
+        else
+            builder.append("create index if not exists on ").append(keyspace).append(".").append(tbl);
+
+        builder.append(" (").append(getColumn()).append(")");
+
+        if (idxCls != null)
+            builder.append(" using '").append(idxCls).append("'");
+
+        if (idxOptions != null)
+            builder.append(" ").append(idxOptions);
+
+        return builder.append(";").toString();
+    }
+
+    /**
+     * Initializes descriptor from {@link QuerySqlField} annotation.
+     *
+     * @param sqlField {@link QuerySqlField} annotation.
+     */
+    protected void init(QuerySqlField sqlField) {
+        if (sqlField.index())
+            isIndexed = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java
new file mode 100644
index 0000000..877167d
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import java.beans.PropertyDescriptor;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+/**
+ * Stores persistence settings for Ignite cache value
+ */
+public class ValuePersistenceSettings extends PersistenceSettings {
+    /** XML element describing value field settings. */
+    private static final String FIELD_ELEMENT = "field";
+
+    /** Value fields. */
+    private List<PojoField> fields = new LinkedList<>();
+
+    /**
+     * Creates class instance from XML configuration.
+     *
+     * @param el XML element describing value persistence settings.
+     */
+    public ValuePersistenceSettings(Element el) {
+        super(el);
+
+        if (!PersistenceStrategy.POJO.equals(getStrategy()))
+            return;
+
+        NodeList nodes = el.getElementsByTagName(FIELD_ELEMENT);
+
+        fields = detectFields(nodes);
+
+        if (fields.isEmpty())
+            throw new IllegalStateException("Failed to initialize value fields for class '" + getJavaClass().getName() + "'");
+
+        checkDuplicates(fields);
+    }
+
+    /**
+     * @return List of value fields.
+     */
+    public List<PojoField> getFields() {
+        return fields == null ? null : Collections.unmodifiableList(fields);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String defaultColumnName() {
+        return "value";
+    }
+
+    /**
+     * Extracts POJO fields from a list of corresponding XML field nodes.
+     *
+     * @param fieldNodes Field nodes to process.
+     * @return POJO fields list.
+     */
+    private List<PojoField> detectFields(NodeList fieldNodes) {
+        List<PojoField> list = new LinkedList<>();
+
+        if (fieldNodes == null || fieldNodes.getLength() == 0) {
+            List<PropertyDescriptor> primitivePropDescriptors = PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true);
+            for (PropertyDescriptor descriptor : primitivePropDescriptors)
+                list.add(new PojoValueField(descriptor));
+
+            return list;
+        }
+
+        List<PropertyDescriptor> allPropDescriptors = PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), false);
+
+        int cnt = fieldNodes.getLength();
+
+        for (int i = 0; i < cnt; i++) {
+            PojoValueField field = new PojoValueField((Element)fieldNodes.item(i), getJavaClass());
+
+            PropertyDescriptor desc = findPropertyDescriptor(allPropDescriptors, field.getName());
+
+            if (desc == null) {
+                throw new IllegalArgumentException("Specified POJO field '" + field.getName() +
+                    "' doesn't exist in '" + getJavaClass().getName() + "' class");
+            }
+
+            list.add(field);
+        }
+
+        return list;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java
new file mode 100644
index 0000000..e9f93a0
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.serializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Serializer based on standard Java serialization.
+ */
+public class JavaSerializer implements Serializer {
+    /** */
+    private static final int DFLT_BUFFER_SIZE = 4096;
+
+    /** {@inheritDoc} */
+    @Override public ByteBuffer serialize(Object obj) {
+        if (obj == null)
+            return null;
+
+        ByteArrayOutputStream stream = null;
+        ObjectOutputStream out = null;
+
+        try {
+            stream = new ByteArrayOutputStream(DFLT_BUFFER_SIZE);
+
+            out = new ObjectOutputStream(stream);
+            out.writeObject(obj);
+            out.flush();
+
+            return ByteBuffer.wrap(stream.toByteArray());
+        }
+        catch (IOException e) {
+            throw new IgniteException("Failed to serialize object of the class '" + obj.getClass().getName() + "'", e);
+        }
+        finally {
+            U.closeQuiet(out);
+            U.closeQuiet(stream);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object deserialize(ByteBuffer buf) {
+        ByteArrayInputStream stream = null;
+        ObjectInputStream in = null;
+
+        try {
+            stream = new ByteArrayInputStream(buf.array());
+            in = new ObjectInputStream(stream);
+
+            return in.readObject();
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Failed to deserialize object from byte stream", e);
+        }
+        finally {
+            U.closeQuiet(in);
+            U.closeQuiet(stream);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/KryoSerializer.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/KryoSerializer.java
new file mode 100644
index 0000000..88379de
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/KryoSerializer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.serializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Serializer based on Kryo serialization.
+ */
+public class KryoSerializer implements Serializer {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private static final int DFLT_BUFFER_SIZE = 4096;
+
+    /** Thread local instance of {@link com.esotericsoftware.kryo.Kryo} */
+    private transient ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
+        protected Kryo initialValue() {
+            return new Kryo();
+        }
+    };
+
+    /** {@inheritDoc} */
+    @Override public ByteBuffer serialize(Object obj) {
+        if (obj == null)
+            return null;
+
+        ByteArrayOutputStream stream = null;
+
+        Output out = null;
+
+        try {
+            stream = new ByteArrayOutputStream(DFLT_BUFFER_SIZE);
+
+            out = new Output(stream);
+
+            kryos.get().writeClassAndObject(out, obj);
+            out.flush();
+
+            return ByteBuffer.wrap(stream.toByteArray());
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Failed to serialize object of the class '" + obj.getClass().getName() + "'", e);
+        }
+        finally {
+            U.closeQuiet(out);
+            U.closeQuiet(stream);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object deserialize(ByteBuffer buf) {
+        ByteArrayInputStream stream = null;
+        Input in = null;
+
+        try {
+            stream = new ByteArrayInputStream(buf.array());
+            in = new Input(stream);
+
+            return kryos.get().readClassAndObject(in);
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Failed to deserialize object from byte stream", e);
+        }
+        finally {
+            U.closeQuiet(in);
+            U.closeQuiet(stream);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java
new file mode 100644
index 0000000..5b8d542
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.serializer;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * Interface which should be implemented by all serializers responsible
+ * for writing/loading data to/from Cassandra in binary (BLOB) format.
+ */
+public interface Serializer extends Serializable {
+    /**
+     * Serializes object into byte buffer.
+     *
+     * @param obj Object to serialize.
+     * @return Byte buffer with binary data.
+     */
+    public ByteBuffer serialize(Object obj);
+
+    /**
+     * Deserializes object from byte buffer.
+     *
+     * @param buf Byte buffer.
+     * @return Deserialized object.
+     */
+    public Object deserialize(ByteBuffer buf);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java
new file mode 100644
index 0000000..e43db1d
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+
+/**
+ * Provides information for batch operations (loadAll, deleteAll, writeAll) of Ignite cache
+ * backed by {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}.
+ *
+ * @param <R> type of the result returned from batch operation.
+ * @param <V> type of the value used in batch operation.
+ */
+public interface BatchExecutionAssistant<R, V> {
+    /**
+     * Indicates if Cassandra tables existence is required for this batch operation.
+     *
+     * @return {@code true} true if table existence required.
+     */
+    public boolean tableExistenceRequired();
+
+    /**
+     * Returns unbind CLQ statement for to be executed inside batch operation.
+     *
+     * @return Unbind CQL statement.
+     */
+    public String getStatement();
+
+    /**
+     * Binds prepared statement to current Cassandra session.
+     *
+     * @param statement Statement.
+     * @param obj Parameters for statement binding.
+     * @return Bounded statement.
+     */
+    public BoundStatement bindStatement(PreparedStatement statement, V obj);
+
+    /**
+     *  Returns Ignite cache key/value persistence settings.
+     *
+     * @return persistence settings.
+     */
+    public KeyValuePersistenceSettings getPersistenceSettings();
+
+    /**
+     * Display name for the batch operation.
+     *
+     * @return Operation display name.
+     */
+    public String operationName();
+
+    /**
+     * Processes particular row inside batch operation.
+     *
+     * @param row Row to process.
+     * @param seqNum Sequential number of the row.
+     */
+    public void process(Row row, int seqNum);
+
+    /**
+     * Checks if row/object with specified sequential number is already processed.
+     *
+     * @param seqNum object sequential number
+     * @return {@code true} if object is already processed
+     */
+    public boolean alreadyProcessed(int seqNum);
+
+    /**
+     * @return number of processed objects/rows.
+     */
+    public int processedCount();
+
+    /**
+     * @return batch operation result.
+     */
+    public R processedData();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java
new file mode 100644
index 0000000..387c98f
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
+
+/**
+ * Provides information for loadCache operation of {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}.
+ */
+public interface BatchLoaderAssistant {
+    /**
+     * Returns name of the batch load operation.
+     *
+     * @return operation name.
+     */
+    public String operationName();
+
+    /**
+     * Returns CQL statement to use in batch load operation.
+     *
+     * @return CQL statement for batch load operation.
+     */
+    public Statement getStatement();
+
+    /**
+     * Processes each row returned by batch load operation.
+     *
+     * @param row row selected from Cassandra table.
+     */
+    public void process(Row row);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
new file mode 100644
index 0000000..506982f
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import java.io.Closeable;
+
+/**
+ * Wrapper around Cassandra driver session, to automatically handle:
+ * <ul>
+ *  <li>Keyspace and table absence exceptions</li>
+ *  <li>Timeout exceptions</li>
+ *  <li>Batch operations</li>
+ * </ul>
+ */
+public interface CassandraSession extends Closeable {
+    /**
+     * Execute single synchronous operation against Cassandra  database.
+     *
+     * @param assistant execution assistance to perform the main operation logic.
+     * @param <V> type of the result returned from operation.
+     *
+     * @return result of the operation.
+     */
+    public <V> V execute(ExecutionAssistant<V> assistant);
+
+    /**
+     * Executes batch asynchronous operation against Cassandra database.
+     *
+     * @param assistant execution assistance to perform the main operation logic.
+     * @param data data which should be processed in batch operation.
+     * @param <R> type of the result returned from batch operation.
+     * @param <V> type of the value used in batch operation.
+     *
+     * @return result of the operation.
+     */
+    public <R, V> R execute(BatchExecutionAssistant<R, V> assistant, Iterable<? extends V> data);
+
+    /**
+     * Executes batch asynchronous operation to load bunch of records
+     * specified by CQL statement from Cassandra database
+     *
+     * @param assistant execution assistance to perform the main operation logic.
+     */
+    public void execute(BatchLoaderAssistant assistant);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
new file mode 100644
index 0000000..95b8581
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
@@ -0,0 +1,832 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.datastax.driver.core.querybuilder.Batch;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
+import org.apache.ignite.cache.store.cassandra.common.RandomSleeper;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+import org.apache.ignite.cache.store.cassandra.session.pool.SessionPool;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+
+/**
+ * Implementation for {@link org.apache.ignite.cache.store.cassandra.session.CassandraSession}.
+ */
+public class CassandraSessionImpl implements CassandraSession {
+    /** Number of CQL query execution attempts. */
+    private static final int CQL_EXECUTION_ATTEMPTS_COUNT = 20;
+
+    /** Min timeout between CQL query execution attempts. */
+    private static final int CQL_EXECUTION_ATTEMPT_MIN_TIMEOUT = 100;
+
+    /** Max timeout between CQL query execution attempts. */
+    private static final int CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT = 500;
+
+    /** Timeout increment for CQL query execution attempts. */
+    private static final int CQL_ATTEMPTS_TIMEOUT_INCREMENT = 100;
+
+    /** Cassandra cluster builder. */
+    private volatile Cluster.Builder builder;
+
+    /** Cassandra driver session. */
+    private volatile Session ses;
+
+    /** Number of references to Cassandra driver session (for multithreaded environment). */
+    private volatile int refCnt = 0;
+
+    /** Storage for the session prepared statements */
+    private static final Map<String, PreparedStatement> sesStatements = new HashMap<>();
+
+    /** Number of records to immediately fetch in CQL statement execution. */
+    private Integer fetchSize;
+
+    /** Consistency level for Cassandra READ operations (select). */
+    private ConsistencyLevel readConsistency;
+
+    /** Consistency level for Cassandra WRITE operations (insert/update/delete). */
+    private ConsistencyLevel writeConsistency;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Table absence error handlers counter. */
+    private final AtomicInteger tblAbsenceHandlersCnt = new AtomicInteger(-1);
+
+    /** Prepared statement cluster disconnection error handlers counter. */
+    private final AtomicInteger prepStatementHandlersCnt = new AtomicInteger(-1);
+
+    /**
+     * Creates instance of Cassandra driver session wrapper.
+     *
+     * @param builder Builder for Cassandra cluster.
+     * @param fetchSize Number of rows to immediately fetch in CQL statement execution.
+     * @param readConsistency Consistency level for Cassandra READ operations (select).
+     * @param writeConsistency Consistency level for Cassandra WRITE operations (insert/update/delete).
+     * @param log Logger.
+     */
+    public CassandraSessionImpl(Cluster.Builder builder, Integer fetchSize, ConsistencyLevel readConsistency,
+        ConsistencyLevel writeConsistency, IgniteLogger log) {
+        this.builder = builder;
+        this.fetchSize = fetchSize;
+        this.readConsistency = readConsistency;
+        this.writeConsistency = writeConsistency;
+        this.log = log;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <V> V execute(ExecutionAssistant<V> assistant) {
+        int attempt = 0;
+        Throwable error = null;
+        String errorMsg = "Failed to execute Cassandra CQL statement: " + assistant.getStatement();
+
+        RandomSleeper sleeper = newSleeper();
+
+        incrementSessionRefs();
+
+        try {
+            while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+                error = null;
+
+                if (attempt != 0) {
+                    log.warning("Trying " + (attempt + 1) + " attempt to execute Cassandra CQL statement: " +
+                            assistant.getStatement());
+                }
+
+                try {
+                    PreparedStatement preparedSt = prepareStatement(assistant.getStatement(),
+                        assistant.getPersistenceSettings(), assistant.tableExistenceRequired());
+
+                    if (preparedSt == null)
+                        return null;
+
+                    Statement statement = tuneStatementExecutionOptions(assistant.bindStatement(preparedSt));
+                    ResultSet res = session().execute(statement);
+
+                    Row row = res == null || !res.iterator().hasNext() ? null : res.iterator().next();
+
+                    return row == null ? null : assistant.process(row);
+                }
+                catch (Throwable e) {
+                    error = e;
+
+                    if (CassandraHelper.isTableAbsenceError(e)) {
+                        if (!assistant.tableExistenceRequired()) {
+                            log.warning(errorMsg, e);
+                            return null;
+                        }
+
+                        handleTableAbsenceError(assistant.getPersistenceSettings());
+                    }
+                    else if (CassandraHelper.isHostsAvailabilityError(e))
+                        handleHostsAvailabilityError(e, attempt, errorMsg);
+                    else if (CassandraHelper.isPreparedStatementClusterError(e))
+                        handlePreparedStatementClusterError(e);
+                    else
+                        // For an error which we don't know how to handle, we will not try next attempts and terminate.
+                        throw new IgniteException(errorMsg, e);
+                }
+
+                sleeper.sleep();
+
+                attempt++;
+            }
+        }
+        catch (Throwable e) {
+            error = e;
+        }
+        finally {
+            decrementSessionRefs();
+        }
+
+        log.error(errorMsg, error);
+
+        throw new IgniteException(errorMsg, error);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R, V> R execute(BatchExecutionAssistant<R, V> assistant, Iterable<? extends V> data) {
+        if (data == null || !data.iterator().hasNext())
+            return assistant.processedData();
+
+        int attempt = 0;
+        String errorMsg = "Failed to execute Cassandra " + assistant.operationName() + " operation";
+        Throwable error = new IgniteException(errorMsg);
+
+        RandomSleeper sleeper = newSleeper();
+
+        int dataSize = 0;
+
+        incrementSessionRefs();
+
+        try {
+            while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+                if (attempt != 0) {
+                    log.warning("Trying " + (attempt + 1) + " attempt to execute Cassandra batch " +
+                            assistant.operationName() + " operation to process rest " +
+                            (dataSize - assistant.processedCount()) + " of " + dataSize + " elements");
+                }
+
+                //clean errors info before next communication with Cassandra
+                Throwable unknownEx = null;
+                Throwable tblAbsenceEx = null;
+                Throwable hostsAvailEx = null;
+                Throwable prepStatEx = null;
+
+                List<Cache.Entry<Integer, ResultSetFuture>> futResults = new LinkedList<>();
+
+                PreparedStatement preparedSt = prepareStatement(assistant.getStatement(),
+                    assistant.getPersistenceSettings(), assistant.tableExistenceRequired());
+
+                if (preparedSt == null)
+                    return null;
+
+                int seqNum = 0;
+
+                for (V obj : data) {
+                    if (!assistant.alreadyProcessed(seqNum)) {
+                        try {
+                            Statement statement = tuneStatementExecutionOptions(assistant.bindStatement(preparedSt, obj));
+                            ResultSetFuture fut = session().executeAsync(statement);
+                            futResults.add(new CacheEntryImpl<>(seqNum, fut));
+                        }
+                        catch (Throwable e) {
+                            if (CassandraHelper.isTableAbsenceError(e)) {
+                                // If there are table absence error and it is not required for the operation we can return.
+                                if (!assistant.tableExistenceRequired())
+                                    return assistant.processedData();
+
+                                tblAbsenceEx = e;
+                                handleTableAbsenceError(assistant.getPersistenceSettings());
+                            }
+                            else if (CassandraHelper.isHostsAvailabilityError(e)) {
+                                hostsAvailEx = e;
+
+                                // Handle host availability only once.
+                                if (hostsAvailEx == null)
+                                    handleHostsAvailabilityError(e, attempt, errorMsg);
+                            }
+                            else if (CassandraHelper.isPreparedStatementClusterError(e)) {
+                                prepStatEx = e;
+                                handlePreparedStatementClusterError(e);
+                            }
+                            else
+                                unknownEx = e;
+                        }
+                    }
+
+                    seqNum++;
+                }
+
+                dataSize = seqNum;
+
+                // For an error which we don't know how to handle, we will not try next attempts and terminate.
+                if (unknownEx != null)
+                    throw new IgniteException(errorMsg, unknownEx);
+
+                // Remembering any of last errors.
+                if (tblAbsenceEx != null)
+                    error = tblAbsenceEx;
+                else if (hostsAvailEx != null)
+                    error = hostsAvailEx;
+                else if (prepStatEx != null)
+                    error = prepStatEx;
+
+                // Clean errors info before next communication with Cassandra.
+                unknownEx = null;
+                tblAbsenceEx = null;
+                hostsAvailEx = null;
+                prepStatEx = null;
+
+                for (Cache.Entry<Integer, ResultSetFuture> futureResult : futResults) {
+                    try {
+                        ResultSet resSet = futureResult.getValue().getUninterruptibly();
+                        Row row = resSet != null && resSet.iterator().hasNext() ? resSet.iterator().next() : null;
+
+                        if (row != null)
+                            assistant.process(row, futureResult.getKey());
+                    }
+                    catch (Throwable e) {
+                        if (CassandraHelper.isTableAbsenceError(e))
+                            tblAbsenceEx = e;
+                        else if (CassandraHelper.isHostsAvailabilityError(e))
+                            hostsAvailEx = e;
+                        else if (CassandraHelper.isPreparedStatementClusterError(e))
+                            prepStatEx = e;
+                        else
+                            unknownEx = e;
+                    }
+                }
+
+                // For an error which we don't know how to handle, we will not try next attempts and terminate.
+                if (unknownEx != null)
+                    throw new IgniteException(errorMsg, unknownEx);
+
+                // If there are no errors occurred it means that operation successfully completed and we can return.
+                if (tblAbsenceEx == null && hostsAvailEx == null && prepStatEx == null)
+                    return assistant.processedData();
+
+                if (tblAbsenceEx != null) {
+                    // If there are table absence error and it is not required for the operation we can return.
+                    if (!assistant.tableExistenceRequired())
+                        return assistant.processedData();
+
+                    error = tblAbsenceEx;
+                    handleTableAbsenceError(assistant.getPersistenceSettings());
+                }
+
+                if (hostsAvailEx != null) {
+                    error = hostsAvailEx;
+                    handleHostsAvailabilityError(hostsAvailEx, attempt, errorMsg);
+                }
+
+                if (prepStatEx != null) {
+                    error = prepStatEx;
+                    handlePreparedStatementClusterError(prepStatEx);
+                }
+
+                sleeper.sleep();
+
+                attempt++;
+            }
+        }
+        catch (Throwable e) {
+            error = e;
+        }
+        finally {
+            decrementSessionRefs();
+        }
+
+        errorMsg = "Failed to process " + (dataSize - assistant.processedCount()) +
+            " of " + dataSize + " elements, during " + assistant.operationName() +
+            " operation with Cassandra";
+
+        log.error(errorMsg, error);
+
+        throw new IgniteException(errorMsg, error);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void execute(BatchLoaderAssistant assistant) {
+        int attempt = 0;
+        String errorMsg = "Failed to execute Cassandra " + assistant.operationName() + " operation";
+        Throwable error = new IgniteException(errorMsg);
+
+        RandomSleeper sleeper = newSleeper();
+
+        incrementSessionRefs();
+
+        try {
+            while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+                if (attempt != 0)
+                    log.warning("Trying " + (attempt + 1) + " attempt to load Ignite cache");
+
+                Statement statement = tuneStatementExecutionOptions(assistant.getStatement());
+
+                try {
+                    ResultSetFuture fut = session().executeAsync(statement);
+                    ResultSet resSet = fut.getUninterruptibly();
+
+                    if (resSet == null || !resSet.iterator().hasNext())
+                        return;
+
+                    for (Row row : resSet)
+                        assistant.process(row);
+
+                    return;
+                }
+                catch (Throwable e) {
+                    error = e;
+
+                    if (CassandraHelper.isTableAbsenceError(e))
+                        return;
+                    else if (CassandraHelper.isHostsAvailabilityError(e))
+                        handleHostsAvailabilityError(e, attempt, errorMsg);
+                    else if (CassandraHelper.isPreparedStatementClusterError(e))
+                        handlePreparedStatementClusterError(e);
+                    else
+                        // For an error which we don't know how to handle, we will not try next attempts and terminate.
+                        throw new IgniteException(errorMsg, e);
+                }
+
+                sleeper.sleep();
+
+                attempt++;
+            }
+        }
+        catch (Throwable e) {
+            error = e;
+        }
+        finally {
+            decrementSessionRefs();
+        }
+
+        log.error(errorMsg, error);
+
+        throw new IgniteException(errorMsg, error);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() throws IOException {
+        if (decrementSessionRefs() == 0 && ses != null) {
+            SessionPool.put(this, ses);
+            ses = null;
+        }
+    }
+
+    /**
+     * Recreates Cassandra driver session.
+     */
+    private synchronized void refresh() {
+        //make sure that session removed from the pool
+        SessionPool.get(this);
+
+        //closing and reopening session
+        CassandraHelper.closeSession(ses);
+        ses = null;
+        session();
+
+        synchronized (sesStatements) {
+            sesStatements.clear();
+        }
+    }
+
+    /**
+     * @return Cassandra driver session.
+     */
+    private synchronized Session session() {
+        if (ses != null)
+            return ses;
+
+        ses = SessionPool.get(this);
+
+        if (ses != null)
+            return ses;
+
+        synchronized (sesStatements) {
+            sesStatements.clear();
+        }
+
+        try {
+            return ses = builder.build().connect();
+        }
+        catch (Throwable e) {
+            throw new IgniteException("Failed to establish session with Cassandra database", e);
+        }
+    }
+
+    /**
+     * Increments number of references to Cassandra driver session (required for multithreaded environment).
+     */
+    private synchronized void incrementSessionRefs() {
+        refCnt++;
+    }
+
+    /**
+     * Decrements number of references to Cassandra driver session (required for multithreaded environment).
+     */
+    private synchronized int decrementSessionRefs() {
+        if (refCnt != 0)
+            refCnt--;
+
+        return refCnt;
+    }
+
+    /**
+     * Prepares CQL statement using current Cassandra driver session.
+     *
+     * @param statement CQL statement.
+     * @param settings Persistence settings.
+     * @param tblExistenceRequired Flag indicating if table existence is required for the statement.
+     * @return Prepared statement.
+     */
+    private PreparedStatement prepareStatement(String statement, KeyValuePersistenceSettings settings,
+        boolean tblExistenceRequired) {
+
+        int attempt = 0;
+        Throwable error = null;
+        String errorMsg = "Failed to prepare Cassandra CQL statement: " + statement;
+
+        RandomSleeper sleeper = newSleeper();
+
+        incrementSessionRefs();
+
+        try {
+            synchronized (sesStatements) {
+                if (sesStatements.containsKey(statement))
+                    return sesStatements.get(statement);
+            }
+
+            while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+                try {
+                    PreparedStatement prepStatement = session().prepare(statement);
+
+                    synchronized (sesStatements) {
+                        sesStatements.put(statement, prepStatement);
+                    }
+
+                    return prepStatement;
+                }
+                catch (Throwable e) {
+                    if (CassandraHelper.isTableAbsenceError(e)) {
+                        if (!tblExistenceRequired)
+                            return null;
+
+                        handleTableAbsenceError(settings);
+                    }
+                    else if (CassandraHelper.isHostsAvailabilityError(e))
+                        handleHostsAvailabilityError(e, attempt, errorMsg);
+                    else
+                        throw new IgniteException(errorMsg, e);
+
+                    error = e;
+                }
+
+                sleeper.sleep();
+
+                attempt++;
+            }
+        }
+        finally {
+            decrementSessionRefs();
+        }
+
+        throw new IgniteException(errorMsg, error);
+    }
+
+    /**
+     * Creates Cassandra keyspace.
+     *
+     * @param settings Persistence settings.
+     */
+    private void createKeyspace(KeyValuePersistenceSettings settings) {
+        int attempt = 0;
+        Throwable error = null;
+        String errorMsg = "Failed to create Cassandra keyspace '" + settings.getKeyspace() + "'";
+
+        while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+            try {
+                log.info("-----------------------------------------------------------------------");
+                log.info("Creating Cassandra keyspace '" + settings.getKeyspace() + "'");
+                log.info("-----------------------------------------------------------------------\n\n" +
+                    settings.getKeyspaceDDLStatement() + "\n");
+                log.info("-----------------------------------------------------------------------");
+                session().execute(settings.getKeyspaceDDLStatement());
+                log.info("Cassandra keyspace '" + settings.getKeyspace() + "' was successfully created");
+                return;
+            }
+            catch (AlreadyExistsException ignored) {
+                log.info("Cassandra keyspace '" + settings.getKeyspace() + "' already exist");
+                return;
+            }
+            catch (Throwable e) {
+                if (!CassandraHelper.isHostsAvailabilityError(e))
+                    throw new IgniteException(errorMsg, e);
+
+                handleHostsAvailabilityError(e, attempt, errorMsg);
+
+                error = e;
+            }
+
+            attempt++;
+        }
+
+        throw new IgniteException(errorMsg, error);
+    }
+
+    /**
+     * Creates Cassandra table.
+     *
+     * @param settings Persistence settings.
+     */
+    private void createTable(KeyValuePersistenceSettings settings) {
+        int attempt = 0;
+        Throwable error = null;
+        String errorMsg = "Failed to create Cassandra table '" + settings.getTableFullName() + "'";
+
+        while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+            try {
+                log.info("-----------------------------------------------------------------------");
+                log.info("Creating Cassandra table '" + settings.getTableFullName() + "'");
+                log.info("-----------------------------------------------------------------------\n\n" +
+                    settings.getTableDDLStatement() + "\n");
+                log.info("-----------------------------------------------------------------------");
+                session().execute(settings.getTableDDLStatement());
+                log.info("Cassandra table '" + settings.getTableFullName() + "' was successfully created");
+                return;
+            }
+            catch (AlreadyExistsException ignored) {
+                log.info("Cassandra table '" + settings.getTableFullName() + "' already exist");
+                return;
+            }
+            catch (Throwable e) {
+                if (!CassandraHelper.isHostsAvailabilityError(e) && !CassandraHelper.isKeyspaceAbsenceError(e))
+                    throw new IgniteException(errorMsg, e);
+
+                if (CassandraHelper.isKeyspaceAbsenceError(e)) {
+                    log.warning("Failed to create Cassandra table '" + settings.getTableFullName() +
+                        "' cause appropriate keyspace doesn't exist", e);
+                    createKeyspace(settings);
+                }
+                else if (CassandraHelper.isHostsAvailabilityError(e))
+                    handleHostsAvailabilityError(e, attempt, errorMsg);
+
+                error = e;
+            }
+
+            attempt++;
+        }
+
+        throw new IgniteException(errorMsg, error);
+    }
+
+    /**
+     * Creates Cassandra table indexes.
+     *
+     * @param settings Persistence settings.
+     */
+    private void createTableIndexes(KeyValuePersistenceSettings settings) {
+        if (settings.getIndexDDLStatements() == null || settings.getIndexDDLStatements().isEmpty())
+            return;
+
+        int attempt = 0;
+        Throwable error = null;
+        String errorMsg = "Failed to create indexes for Cassandra table " + settings.getTableFullName();
+
+        while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+            try {
+                log.info("Creating indexes for Cassandra table '" + settings.getTableFullName() + "'");
+
+                for (String statement : settings.getIndexDDLStatements()) {
+                    try {
+                        session().execute(statement);
+                    }
+                    catch (AlreadyExistsException ignored) {
+                    }
+                    catch (Throwable e) {
+                        if (!(e instanceof InvalidQueryException) || !e.getMessage().equals("Index already exists"))
+                            throw new IgniteException(errorMsg, e);
+                    }
+                }
+
+                log.info("Indexes for Cassandra table '" + settings.getTableFullName() + "' were successfully created");
+
+                return;
+            }
+            catch (Throwable e) {
+                if (CassandraHelper.isHostsAvailabilityError(e))
+                    handleHostsAvailabilityError(e, attempt, errorMsg);
+                else if (CassandraHelper.isTableAbsenceError(e))
+                    createTable(settings);
+                else
+                    throw new IgniteException(errorMsg, e);
+
+                error = e;
+            }
+
+            attempt++;
+        }
+
+        throw new IgniteException(errorMsg, error);
+    }
+
+    /**
+     * Tunes CQL statement execution options (consistency level, fetch option and etc.).
+     *
+     * @param statement Statement.
+     * @return Modified statement.
+     */
+    private Statement tuneStatementExecutionOptions(Statement statement) {
+        String qry = "";
+
+        if (statement instanceof BoundStatement)
+            qry = ((BoundStatement)statement).preparedStatement().getQueryString().trim().toLowerCase();
+        else if (statement instanceof PreparedStatement)
+            qry = ((PreparedStatement)statement).getQueryString().trim().toLowerCase();
+
+        boolean readStatement = qry.startsWith("select");
+        boolean writeStatement = statement instanceof Batch || statement instanceof BatchStatement ||
+            qry.startsWith("insert") || qry.startsWith("delete") || qry.startsWith("update");
+
+        if (readStatement && readConsistency != null)
+            statement.setConsistencyLevel(readConsistency);
+
+        if (writeStatement && writeConsistency != null)
+            statement.setConsistencyLevel(writeConsistency);
+
+        if (fetchSize != null)
+            statement.setFetchSize(fetchSize);
+
+        return statement;
+    }
+
+    /**
+     * Handles situation when Cassandra table doesn't exist.
+     *
+     * @param settings Persistence settings.
+     */
+    private void handleTableAbsenceError(KeyValuePersistenceSettings settings) {
+        int hndNum = tblAbsenceHandlersCnt.incrementAndGet();
+
+        try {
+            synchronized (tblAbsenceHandlersCnt) {
+                // Oooops... I am not the first thread who tried to handle table absence problem.
+                if (hndNum != 0) {
+                    log.warning("Table " + settings.getTableFullName() + " absence problem detected. " +
+                            "Another thread already fixed it.");
+                    return;
+                }
+
+                log.warning("Table " + settings.getTableFullName() + " absence problem detected. " +
+                        "Trying to create table.");
+
+                IgniteException error = new IgniteException("Failed to create Cassandra table " + settings.getTableFullName());
+
+                int attempt = 0;
+
+                while (error != null && attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+                    error = null;
+
+                    try {
+                        createKeyspace(settings);
+                        createTable(settings);
+                        createTableIndexes(settings);
+                    }
+                    catch (Throwable e) {
+                        if (CassandraHelper.isHostsAvailabilityError(e))
+                            handleHostsAvailabilityError(e, attempt, null);
+                        else
+                            throw new IgniteException("Failed to create Cassandra table " + settings.getTableFullName(), e);
+
+                        error = (e instanceof IgniteException) ? (IgniteException)e : new IgniteException(e);
+                    }
+
+                    attempt++;
+                }
+
+                if (error != null)
+                    throw error;
+            }
+        }
+        finally {
+            if (hndNum == 0)
+                tblAbsenceHandlersCnt.set(-1);
+        }
+    }
+
+    /**
+     * Handles situation when prepared statement execution failed cause session to the cluster was released.
+     *
+     */
+    private void handlePreparedStatementClusterError(Throwable e) {
+        int hndNum = prepStatementHandlersCnt.incrementAndGet();
+
+        try {
+            synchronized (prepStatementHandlersCnt) {
+                // Oooops... I am not the first thread who tried to handle prepared statement problem.
+                if (hndNum != 0) {
+                    log.warning("Prepared statement cluster error detected, another thread already fixed the problem", e);
+                    return;
+                }
+
+                log.warning("Prepared statement cluster error detected, refreshing Cassandra session", e);
+
+                refresh();
+
+                log.warning("Cassandra session refreshed");
+            }
+        }
+        finally {
+            if (hndNum == 0)
+                prepStatementHandlersCnt.set(-1);
+        }
+    }
+
+    /**
+     * Handles situation when Cassandra host which is responsible for CQL query execution became unavailable.
+     *
+     * @param e Exception to handle.
+     * @param attempt Number of attempts.
+     * @param msg Error message.
+     * @return {@code true} if host unavailability was successfully handled.
+     */
+    private boolean handleHostsAvailabilityError(Throwable e, int attempt, String msg) {
+        if (attempt >= CQL_EXECUTION_ATTEMPTS_COUNT) {
+            log.error("Host availability problem detected. " +
+                    "Number of CQL execution attempts reached maximum " + CQL_EXECUTION_ATTEMPTS_COUNT +
+                    ", exception will be thrown to upper execution layer.", e);
+            throw msg == null ? new IgniteException(e) : new IgniteException(msg, e);
+        }
+
+        if (attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 4  ||
+            attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2  ||
+            attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2 + CQL_EXECUTION_ATTEMPTS_COUNT / 4  ||
+            attempt == CQL_EXECUTION_ATTEMPTS_COUNT - 1) {
+            log.warning("Host availability problem detected, CQL execution attempt  " + (attempt + 1) + ", " +
+                    "refreshing Cassandra session", e);
+
+            refresh();
+
+            log.warning("Cassandra session refreshed");
+
+            return true;
+        }
+
+        log.warning("Host availability problem detected, CQL execution attempt " + (attempt + 1) + ", " +
+                "sleeping extra " + CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT + " milliseconds", e);
+
+        try {
+            Thread.sleep(CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT);
+        }
+        catch (InterruptedException ignored) {
+        }
+
+        log.warning("Sleep completed");
+
+        return false;
+    }
+
+    /**
+     * @return New random sleeper.
+     */
+    private RandomSleeper newSleeper() {
+        return new RandomSleeper(CQL_EXECUTION_ATTEMPT_MIN_TIMEOUT,
+                CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT,
+                CQL_ATTEMPTS_TIMEOUT_INCREMENT, log);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java
new file mode 100644
index 0000000..867f58d
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+
+/**
+ * Provides information for single operations (load, delete, write) of Ignite cache
+ * backed by {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}.
+ *
+ * @param <R> type of the result returned from operation.
+ */
+public interface ExecutionAssistant<R> {
+    /**
+     * Indicates if Cassandra table existence is required for operation.
+     *
+     * @return true if table existence required.
+     */
+    public boolean tableExistenceRequired();
+
+    /**
+     * Returns CQL statement to be used for operation.
+     *
+     * @return CQL statement.
+     */
+    public String getStatement();
+
+    /**
+     * Binds prepared statement.
+     *
+     * @param statement prepared statement.
+     *
+     * @return bound statement.
+     */
+    public BoundStatement bindStatement(PreparedStatement statement);
+
+    /**
+     * Persistence settings to use for operation.
+     *
+     * @return persistence settings.
+     */
+    public KeyValuePersistenceSettings getPersistenceSettings();
+
+    /**
+     * Returns operation name.
+     *
+     * @return operation name.
+     */
+    public String operationName();
+
+    /**
+     * Processes Cassandra database table row returned by specified CQL statement.
+     *
+     * @param row Cassandra database table row.
+     *
+     * @return result of the operation.
+     */
+    public R process(Row row);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java
new file mode 100644
index 0000000..e80583a
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.Row;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Implementation of the {@link org.apache.ignite.cache.store.cassandra.session.BatchExecutionAssistant}.
+ *
+ * @param <R> Type of the result returned from batch operation
+ * @param <V> Type of the value used in batch operation
+ */
+public abstract class GenericBatchExecutionAssistant<R, V> implements BatchExecutionAssistant<R, V> {
+    /** Identifiers of already processed objects. */
+    private Set<Integer> processed = new HashSet<>();
+
+    /** {@inheritDoc} */
+    @Override public void process(Row row, int seqNum) {
+        if (processed.contains(seqNum))
+            return;
+
+        process(row);
+
+        processed.add(seqNum);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean alreadyProcessed(int seqNum) {
+        return processed.contains(seqNum);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int processedCount() {
+        return processed.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public R processedData() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean tableExistenceRequired() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    protected void process(Row row) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
new file mode 100644
index 0000000..d3ace7d
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
+import org.apache.ignite.lang.IgniteBiInClosure;
+
+/**
+ * Worker for load cache using custom user query.
+ *
+ * @param <K> Key type.
+ * @param <V> Value type.
+ */
+public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void> {
+    /** Cassandra session to execute CQL query */
+    private final CassandraSession ses;
+
+    /** User query. */
+    private final String qry;
+
+    /** Persistence controller */
+    private final PersistenceController ctrl;
+
+    /** Logger */
+    private final IgniteLogger log;
+
+    /** Closure for loaded values. */
+    private final IgniteBiInClosure<K, V> clo;
+
+    /**
+     * @param clo Closure for loaded values.
+     */
+    public LoadCacheCustomQueryWorker(CassandraSession ses, String qry, PersistenceController ctrl,
+        IgniteLogger log, IgniteBiInClosure<K, V> clo) {
+        this.ses = ses;
+        this.qry = qry.trim().endsWith(";") ? qry : qry + ";";
+        this.ctrl = ctrl;
+        this.log = log;
+        this.clo = clo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Void call() throws Exception {
+        ses.execute(new BatchLoaderAssistant() {
+            /** {@inheritDoc} */
+            @Override public String operationName() {
+                return "loadCache";
+            }
+
+            /** {@inheritDoc} */
+            @Override public Statement getStatement() {
+                return new SimpleStatement(qry);
+            }
+
+            /** {@inheritDoc} */
+            @Override public void process(Row row) {
+                K key;
+                V val;
+
+                try {
+                    key = (K)ctrl.buildKeyObject(row);
+                }
+                catch (Throwable e) {
+                    log.error("Failed to build Ignite key object from provided Cassandra row", e);
+
+                    throw new IgniteException("Failed to build Ignite key object from provided Cassandra row", e);
+                }
+
+                try {
+                    val = (V)ctrl.buildValueObject(row);
+                }
+                catch (Throwable e) {
+                    log.error("Failed to build Ignite value object from provided Cassandra row", e);
+
+                    throw new IgniteException("Failed to build Ignite value object from provided Cassandra row", e);
+                }
+
+                clo.apply(key, val);
+            }
+        });
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
new file mode 100644
index 0000000..fc4a907
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.pool;
+
+import com.datastax.driver.core.Session;
+import java.lang.Thread.State;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
+
+/**
+ * Cassandra driver sessions pool.
+ */
+public class SessionPool {
+    /**
+     * Monitors session pool and closes unused session.
+     */
+    private static class SessionMonitor extends Thread {
+        /** {@inheritDoc} */
+        @Override public void run() {
+            try {
+                while (true) {
+                    try {
+                        Thread.sleep(SLEEP_TIMEOUT);
+                    }
+                    catch (InterruptedException ignored) {
+                        return;
+                    }
+
+                    List<Map.Entry<CassandraSessionImpl, SessionWrapper>> expiredSessions = new LinkedList<>();
+
+                    int sessionsCnt;
+
+                    synchronized (sessions) {
+                        sessionsCnt = sessions.size();
+
+                        for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : sessions.entrySet()) {
+                            if (entry.getValue().expired())
+                                expiredSessions.add(entry);
+                        }
+
+                        for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : expiredSessions)
+                            sessions.remove(entry.getKey());
+                    }
+
+                    for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : expiredSessions)
+                        entry.getValue().release();
+
+                    // all sessions in the pool expired, thus we don't need additional thread to manage sessions in the pool
+                    if (sessionsCnt == expiredSessions.size())
+                        return;
+                }
+            }
+            finally {
+                release();
+            }
+        }
+    }
+
+    /** Sessions monitor sleep timeout. */
+    private static final long SLEEP_TIMEOUT = 60000; // 1 minute.
+
+    /** Sessions which were returned to pool. */
+    private static final Map<CassandraSessionImpl, SessionWrapper> sessions = new HashMap<>();
+
+    /** Singleton instance. */
+    private static SessionMonitor monitorSingleton;
+
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override public void run() {
+                release();
+            }
+        });
+    }
+
+    /**
+     * Returns Cassandra driver session to sessions pool.
+     *
+     * @param cassandraSes Session wrapper.
+     * @param driverSes Driver session.
+     */
+    public static void put(CassandraSessionImpl cassandraSes, Session driverSes) {
+        if (cassandraSes == null || driverSes == null)
+            return;
+
+        SessionWrapper old;
+
+        synchronized (sessions) {
+            old = sessions.put(cassandraSes, new SessionWrapper(driverSes));
+
+            if (monitorSingleton == null || State.TERMINATED.equals(monitorSingleton.getState())) {
+                monitorSingleton = new SessionMonitor();
+                monitorSingleton.setDaemon(true);
+                monitorSingleton.setName("Cassandra-sessions-pool");
+                monitorSingleton.start();
+            }
+        }
+
+        if (old != null)
+            old.release();
+    }
+
+    /**
+     * Extracts Cassandra driver session from pool.
+     *
+     * @param cassandraSes Session wrapper.
+     * @return Cassandra driver session.
+     */
+    public static Session get(CassandraSessionImpl cassandraSes) {
+        if (cassandraSes == null)
+            return null;
+
+        SessionWrapper wrapper;
+
+        synchronized (sessions) {
+            wrapper = sessions.remove(cassandraSes);
+        }
+
+        return wrapper == null ? null : wrapper.driverSession();
+    }
+
+    /**
+     * Releases all session from pool and closes all their connections to Cassandra database.
+     */
+    public static void release() {
+        Collection<SessionWrapper> wrappers;
+
+        synchronized (sessions) {
+            try {
+                if (sessions.size() == 0)
+                    return;
+
+                wrappers = new LinkedList<>();
+
+                for (SessionWrapper wrapper : sessions.values())
+                    wrappers.add(wrapper);
+
+                sessions.clear();
+            }
+            finally {
+                if (!(Thread.currentThread() instanceof SessionMonitor) && monitorSingleton != null) {
+                    try {
+                        monitorSingleton.interrupt();
+                    }
+                    catch (Throwable ignored) {
+                    }
+                }
+            }
+        }
+
+        for (SessionWrapper wrapper : wrappers)
+            wrapper.release();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
new file mode 100644
index 0000000..7c5722b
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.pool;
+
+import com.datastax.driver.core.Session;
+import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
+
+/**
+ * Wrapper for Cassandra driver session, responsible for monitoring session expiration and its closing.
+ */
+public class SessionWrapper {
+    /** Expiration timeout for Cassandra driver session. */
+    public static final long DFLT_EXPIRATION_TIMEOUT = 300000;  // 5 minutes.
+
+    /** Cassandra driver session. */
+    private Session ses;
+
+    /** Wrapper creation time.  */
+    private long time;
+
+    /**
+     * Creates instance of Cassandra driver session wrapper.
+     *
+     * @param ses Cassandra driver session.
+     */
+    public SessionWrapper(Session ses) {
+        this.ses = ses;
+        this.time = System.currentTimeMillis();
+    }
+
+    /**
+     * Checks if Cassandra driver session expired.
+     *
+     * @return true if session expired.
+     */
+    public boolean expired() {
+        return System.currentTimeMillis() - time > DFLT_EXPIRATION_TIMEOUT;
+    }
+
+    /**
+     * Returns wrapped Cassandra driver session.
+     *
+     * @return Cassandra driver session.
+     */
+    public Session driverSession() {
+        return ses;
+    }
+
+    /**
+     * Closes wrapped Cassandra driver session
+     */
+    public void release() {
+        CassandraHelper.closeSession(ses);
+        ses = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83c26a91/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java
new file mode 100644
index 0000000..4f40478
--- /dev/null
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.utils;
+
+import java.io.File;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+
+/**
+ * Generates Cassandra DDL statements from persistence descriptor xml file.
+ */
+public class DDLGenerator {
+    /**
+     * DDLGenerator entry point.
+     *
+     * @param args Arguments for DDLGenerator.
+     */
+    public static void main(String[] args) {
+        if (args == null || args.length == 0)
+            return;
+
+        for (String arg : args) {
+            File file = new File(arg);
+            if (!file.isFile()) {
+                System.out.println("-------------------------------------------------------------");
+                System.out.println("Incorrect file specified: " + arg);
+                System.out.println("-------------------------------------------------------------");
+                continue;
+            }
+
+            try {
+                KeyValuePersistenceSettings settings = new KeyValuePersistenceSettings(file);
+                System.out.println("-------------------------------------------------------------");
+                System.out.println("DDL for keyspace/table from file: " + arg);
+                System.out.println("-------------------------------------------------------------");
+                System.out.println();
+                System.out.println(settings.getKeyspaceDDLStatement());
+                System.out.println();
+                System.out.println(settings.getTableDDLStatement());
+                System.out.println();
+            }
+            catch (Throwable e) {
+                System.out.println("-------------------------------------------------------------");
+                System.out.println("Incorrect file specified: " + arg);
+                System.out.println("-------------------------------------------------------------");
+                e.printStackTrace();
+            }
+        }
+    }
+}


Mime
View raw message