ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject incubator-ignite git commit: # IGNITE-32: working on datastore
Date Mon, 26 Jan 2015 01:54:14 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-32 69feb3925 -> c9803b6ad


# IGNITE-32: working on datastore


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c9803b6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c9803b6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c9803b6a

Branch: refs/heads/ignite-32
Commit: c9803b6ad400b6daaf1ea54c6aa1bb1dfd1d3808
Parents: 69feb39
Author: AKuznetsov <akuznetsov@gridgain.com>
Authored: Mon Jan 26 08:53:55 2015 +0700
Committer: AKuznetsov <akuznetsov@gridgain.com>
Committed: Mon Jan 26 08:53:55 2015 +0700

----------------------------------------------------------------------
 .../apache/ignite/cache/store/CacheStore.java   |   2 +-
 .../ignite/cache/store/jdbc/JdbcCacheStore.java | 933 ++++++++++---------
 .../cache/store/jdbc/JdbcPojoCacheStore.java    |  45 +-
 .../store/jdbc/dialect/BasicJdbcDialect.java    |  88 +-
 .../cache/store/jdbc/dialect/JdbcDialect.java   | 101 ++
 .../PojoCacheStoreMultitreadedSelfTest.java     |   8 +-
 .../store/jdbc/PojoCacheStoreSelfTest.java      | 255 +++--
 7 files changed, 789 insertions(+), 643 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9803b6a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
index b127a18..2b72619 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
@@ -166,7 +166,7 @@ public abstract class CacheStore<K, V> implements CacheLoader<K, V>, CacheWriter
      *
      * @return Session for current cache operation.
      */
-    @Nullable public CacheStoreSession session() {
+    public CacheStoreSession session() {
         return ses;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9803b6a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
index 3de4ae4..afdec20 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
@@ -32,7 +32,6 @@ import org.jetbrains.annotations.*;
 import javax.cache.*;
 import javax.cache.integration.*;
 import javax.sql.*;
-import java.net.*;
 import java.sql.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -44,11 +43,11 @@ import java.util.concurrent.atomic.*;
  */
 public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
     /**
-     * Query cache by type.
+     * Entry mapping description.
      */
-    protected static class QueryCache {
+    protected static class EntryMapping {
         /** Database dialect. */
-        protected final BasicJdbcDialect dialect;
+        protected final JdbcDialect dialect;
 
         /** Select all items query. */
         protected final String loadCacheQry;
@@ -89,7 +88,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
         /**
          * @param typeMetadata Type metadata.
          */
-        public QueryCache(BasicJdbcDialect dialect, GridCacheQueryTypeMetadata typeMetadata) {
+        public EntryMapping(JdbcDialect dialect, GridCacheQueryTypeMetadata typeMetadata) {
             this.dialect = dialect;
 
             this.typeMetadata = typeMetadata;
@@ -221,17 +220,14 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
     /** Execute. */
     protected ExecutorService exec;
 
-    /** Paths to xml with type mapping description. */
-    protected Collection<String> typeMetadataPaths;
-
     /** Type mapping description. */
     protected Collection<GridCacheQueryTypeMetadata> typeMetadata;
 
     /** Cache with query by type. */
-    protected Map<Object, QueryCache> entryQtyCache;
+    protected Map<Object, EntryMapping> typeMeta;
 
     /** Database dialect. */
-    protected BasicJdbcDialect dialect;
+    protected JdbcDialect dialect;
 
     /** Max workers thread count. These threads are responsible for execute query. */
     protected int maxPoolSz = Runtime.getRuntime().availableProcessors();
@@ -240,12 +236,51 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
     protected int batchSz = DFLT_BATCH_SIZE;
 
     /**
+     * Get field value from object.
+     *
+     * @param typeName Type name.
+     * @param fieldName Field name.
+     * @param obj Cache object.
+     * @return Field value from object.
+     */
+    @Nullable protected abstract Object extractField(String typeName, String fieldName, Object obj)
+        throws CacheException;
+
+    /**
+     * Construct object from query result.
+     *
+     * @param <R> Type of result object.
+     * @param typeName Type name.
+     * @param fields Fields descriptors.
+     * @param rs ResultSet.
+     * @return Constructed object.
+     */
+    protected abstract <R> R buildObject(String typeName, Collection<GridCacheQueryTypeDescriptor> fields, ResultSet rs)
+        throws CacheLoaderException;
+
+    /**
+     * Extract type key from object.
+     *
+     * @param key Key object.
+     * @return Type key.
+     * @throws CacheException If failed to extract type key.
+     */
+    protected abstract Object typeId(Object key) throws CacheException;
+
+    /**
+     * Build cache for mapped types.
+     *
+     * @throws CacheException If failed to initialize.
+     */
+    protected abstract void buildTypeCache() throws CacheException;
+
+    /**
      * Perform dialect resolution.
      *
      * @return The resolved dialect.
-     * @throws IgniteCheckedException Indicates problems accessing the metadata.
+     * @throws CacheException Indicates problems accessing the metadata.
      */
-    protected BasicJdbcDialect resolveDialect() throws IgniteCheckedException {
+    protected JdbcDialect resolveDialect() throws CacheException {
         Connection conn = null;
 
         String dbProductName = null;
@@ -256,10 +291,10 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
             dbProductName = conn.getMetaData().getDatabaseProductName();
         }
         catch (SQLException e) {
-            throw new IgniteCheckedException("Failed access to metadata for detect database dialect.", e);
+            throw new CacheException("Failed access to metadata for detect database dialect.", e);
         }
         finally {
-            closeConnection(conn);
+            U.closeQuiet(conn);
         }
 
         if ("H2".equals(dbProductName))
@@ -283,49 +318,23 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
     /**
      * Initializes store.
      *
-     * @throws IgniteCheckedException If failed to initialize.
+     * @throws CacheException If failed to initialize.
      */
-    protected void init() throws IgniteCheckedException {
+    protected void init() throws CacheException {
         if (initLatch.getCount() > 0) {
             if (initGuard.compareAndSet(false, true)) {
                 if (log.isDebugEnabled())
                     log.debug("Initializing cache store.");
 
                 if (dataSrc == null && F.isEmpty(connUrl))
-                    throw new IgniteCheckedException("Failed to initialize cache store (connection is not provided).");
+                    throw new CacheException("Failed to initialize cache store (connection is not provided).");
 
                 if (dialect == null)
                     dialect = resolveDialect();
 
                 try {
-                    if (typeMetadata == null) {
-                        if (typeMetadataPaths == null)
-                            throw new IgniteCheckedException(
-                                "Failed to initialize cache store (metadata paths is not provided).");
-
-// TODO: IGNITE-32 Replace with reading from config.
-//                        GridSpringProcessor spring = SPRING.create(false);
-
-                        Collection<GridCacheQueryTypeMetadata> typeMeta = new ArrayList<>();
-
-                        for (String path : typeMetadataPaths) {
-                            URL url = U.resolveGridGainUrl(path);
-// TODO: IGNITE-32 Replace with reading from config.
-//                            if (url != null) {
-//                                Map<String, Object> beans = spring.loadBeans(url, GridCacheQueryTypeMetadata.class).
-//                                    get(GridCacheQueryTypeMetadata.class);
-//
-//                                if (beans != null)
-//                                    for (Object bean : beans.values())
-//                                        if (bean instanceof GridCacheQueryTypeMetadata)
-//                                            typeMeta.add((GridCacheQueryTypeMetadata)bean);
-//                            }
-//                            else
-                                log.warning("Failed to resolve metadata path: " + path);
-                        }
-
-                        setTypeMetadata(typeMeta);
-                    }
+                    if (typeMetadata == null)
+                        throw new CacheException("Failed to initialize cache store (mappping description is not provided).");
 
                     exec = Executors.newFixedThreadPool(maxPoolSz);
 
@@ -338,26 +347,19 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
                 }
             }
             else
-                U.await(initLatch);
+                try {
+                    if (initLatch.getCount() > 0)
+                        initLatch.await();
+                }
+                catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+
+                    throw new CacheException(e);
+                }
         }
 
         if (!initOk)
-            throw new IgniteCheckedException("Cache store was not properly initialized.");
-    }
-
-    /**
-     * Closes allocated resources depending on transaction status.
-     *
-     * @param tx Active transaction, if any.
-     * @param conn Allocated connection.
-     * @param st Created statement,
-     */
-    protected void end(@Nullable IgniteTx tx, @Nullable Connection conn, @Nullable Statement st) {
-        U.closeQuiet(st);
-
-        if (tx == null)
-            // Close connection right away if there is no transaction.
-            closeConnection(conn);
+            throw new CacheException("Cache store was not properly initialized.");
     }
 
     /**
@@ -377,29 +379,23 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
     }
 
     /**
-     * Closes connection.
-     *
-     * @param conn Connection to close.
-     */
-    protected void closeConnection(@Nullable Connection conn) {
-        U.closeQuiet(conn);
-    }
-
-    /**
-     * @param tx Cache transaction.
      * @return Connection.
      * @throws SQLException In case of error.
      */
-    protected Connection connection(@Nullable IgniteTx tx) throws SQLException {
-        if (tx != null) {
-            Connection conn = null;// TODO: IGNITE-32 FIXME tx.meta(ATTR_CONN);
+    protected Connection connection() throws SQLException {
+        CacheStoreSession ses = session();
+
+        if (ses.transaction() != null) {
+            Map<String, Connection> prop = ses.properties();
+
+            Connection conn = prop.get(ATTR_CONN);
 
             if (conn == null) {
                 conn = openConnection(false);
 
-                // Store connection in transaction metadata, so it can be accessed
-                // for other operations on the same transaction.
-                // TODO: IGNITE-32 FIXME tx.addMeta(ATTR_CONN, conn);
+                // Store connection in session, so it can be accessed
+                // for other operations on the same session.
+                prop.put(ATTR_CONN, conn);
             }
 
             return conn;
@@ -409,83 +405,30 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
             return openConnection(true);
     }
 
-    /** {@inheritDoc} */
-    public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException {
-        init();
-
-        Connection conn = null; // TODO: IGNITE-32 FIXME tx.removeMeta(ATTR_CONN);
-
-        if (conn != null) {
-            try {
-                if (commit)
-                    conn.commit();
-                else
-                    conn.rollback();
-            }
-            catch (SQLException e) {
-                throw new IgniteCheckedException(
-                    "Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e);
-            }
-            finally {
-                closeConnection(conn);
-            }
-        }
-
-        if (log.isDebugEnabled())
-            log.debug("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']');
-    }
-
     /**
-     * Extract database column names from {@link GridCacheQueryTypeDescriptor}.
-     *
-     * @param dsc collection of {@link GridCacheQueryTypeDescriptor}.
-     */
-    protected static Collection<String> databaseColumns(Collection<GridCacheQueryTypeDescriptor> dsc) {
-        return F.transform(dsc, new C1<GridCacheQueryTypeDescriptor, String>() {
-            /** {@inheritDoc} */
-            @Override public String apply(GridCacheQueryTypeDescriptor desc) {
-                return desc.getDbName();
-            }
-        });
-    }
-
-    /**
-     * Get field value from object.
+     * Closes connection.
      *
-     * @param typeName Type name.
-     * @param fieldName Field name.
-     * @param obj Cache object.
-     * @return Field value from object.
+     * @param conn Connection to close.
      */
-    @Nullable protected abstract Object extractField(String typeName, String fieldName, Object obj)
-        throws IgniteCheckedException;
+    protected void closeConnection(@Nullable Connection conn) {
+        CacheStoreSession ses = session();
 
-    /**
-     * Construct object from query result.
-     *
-     * @param <R> Type of result object.
-     * @param typeName Type name.
-     * @param fields Fields descriptors.
-     * @param rs ResultSet.
-     * @return Constructed object.
-     */
-    protected abstract <R> R buildObject(String typeName, Collection<GridCacheQueryTypeDescriptor> fields, ResultSet rs)
-        throws IgniteCheckedException;
+        // Close connection right away if there is no transaction.
+        if (ses.transaction() == null)
+            U.closeQuiet(conn);
+    }
 
     /**
-     * Extract type key from object.
+     * Closes allocated resources depending on transaction status.
      *
-     * @param key Key object.
-     * @return Type key.
+     * @param conn Allocated connection.
+     * @param st Created statement,
      */
-    protected abstract Object typeKey(K key);
+    protected void end(@Nullable Connection conn, @Nullable Statement st) {
+        U.closeQuiet(st);
 
-    /**
-     * Build cache for mapped types.
-     *
-     * @throws IgniteCheckedException If failed to initialize.
-     */
-    protected abstract void buildTypeCache() throws IgniteCheckedException;
+        closeConnection(conn);
+    }
 
     /** {@inheritDoc} */
     @Override public void loadCache(final IgniteBiInClosure<K, V> clo, @Nullable Object... args)
@@ -498,40 +441,40 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
 
             Collection<Future<?>> futs = new ArrayList<>();
 
-            for (final QueryCache type : entryQtyCache.values())
+            for (final EntryMapping type : typeMeta.values())
                 futs.add(exec.submit(new Callable<Void>() {
                     @Override public Void call() throws Exception {
-                        Connection conn = null;
+                    Connection conn = null;
 
-                        try {
-                            PreparedStatement stmt = null;
+                    try {
+                        PreparedStatement stmt = null;
 
-                            try {
-                                conn = connection(null);
+                        try {
+                            conn = connection();
 
-                                stmt = conn.prepareStatement(type.loadCacheQry);
+                            stmt = conn.prepareStatement(type.loadCacheQry);
 
-                                ResultSet rs = stmt.executeQuery();
+                            ResultSet rs = stmt.executeQuery();
 
-                                while (rs.next()) {
-                                    K key = buildObject(type.keyType(), type.keyDescriptors(), rs);
-                                    V val = buildObject(type.valueType(), type.valueDescriptors(), rs);
+                            while (rs.next()) {
+                                K key = buildObject(type.keyType(), type.keyDescriptors(), rs);
+                                V val = buildObject(type.valueType(), type.valueDescriptors(), rs);
 
-                                    clo.apply(key, val);
-                                }
-                            }
-                            catch (SQLException e) {
-                                throw new IgniteCheckedException("Failed to load cache", e);
-                            }
-                            finally {
-                                U.closeQuiet(stmt);
+                                clo.apply(key, val);
                             }
                         }
+                        catch (SQLException e) {
+                            throw new IgniteCheckedException("Failed to load cache", e);
+                        }
                         finally {
-                            closeConnection(conn);
+                            U.closeQuiet(stmt);
                         }
+                    }
+                    finally {
+                        U.closeQuiet(conn);
+                    }
 
-                        return null;
+                    return null;
                     }
                 }));
 
@@ -543,86 +486,57 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
         }
     }
 
-    /**
-     * @param stmt Prepare statement.
-     * @param i Start index for parameters.
-     * @param type Type description.
-     * @param key Key object.
-     * @return Next index for parameters.
-     */
-    protected int fillKeyParameters(PreparedStatement stmt, int i, QueryCache type,
-        K key) throws IgniteCheckedException {
-        for (GridCacheQueryTypeDescriptor field : type.keyDescriptors()) {
-            Object fieldVal = extractField(type.keyType(), field.getJavaName(), key);
+    /** {@inheritDoc} */
+    @Override public void txEnd(boolean commit) throws CacheWriterException {
+        CacheStoreSession ses = session();
 
-            try {
-                if (fieldVal != null)
-                    stmt.setObject(i++, fieldVal);
-                else
-                    stmt.setNull(i++, field.getDbType());
-            }
-            catch (SQLException e) {
-                throw new IgniteCheckedException("Failed to set statement parameter name: " + field.getDbName(), e);
-            }
-        }
+        IgniteTx tx = ses.transaction();
 
-        return i;
-    }
+        Connection conn = ses.<String, Connection>properties().remove(ATTR_CONN);
 
-    /**
-     * @param stmt Prepare statement.
-     * @param type Type description.
-     * @param key Key object.
-     * @return Next index for parameters.
-     */
-    protected int fillKeyParameters(PreparedStatement stmt, QueryCache type, K key) throws IgniteCheckedException {
-        return fillKeyParameters(stmt, 1, type, key);
-    }
 
-    /**
-     * @param stmt Prepare statement.
-     * @param i Start index for parameters.
-     * @param type Type description.
-     * @param val Value object.
-     * @return Next index for parameters.
-     */
-    protected int fillValueParameters(PreparedStatement stmt, int i, QueryCache type, V val)
-        throws IgniteCheckedException {
-        for (GridCacheQueryTypeDescriptor field : type.uniqValFields) {
-            Object fieldVal = extractField(type.valueType(), field.getJavaName(), val);
+        if (conn != null) {
+            assert tx != null;
 
             try {
-                if (fieldVal != null)
-                    stmt.setObject(i++, fieldVal);
+                if (commit)
+                    conn.commit();
                 else
-                    stmt.setNull(i++, field.getDbType());
+                    conn.rollback();
             }
             catch (SQLException e) {
-                throw new IgniteCheckedException("Failed to set statement parameter name: " + field.getDbName(), e);
+                throw new CacheWriterException(
+                    "Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e);
+            }
+            finally {
+                U.closeQuiet(conn);
             }
         }
 
-        return i;
+        if (tx != null && log.isDebugEnabled())
+            log.debug("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']');
     }
 
     /** {@inheritDoc} */
-    @Nullable public V load(@Nullable IgniteTx tx, K key) throws IgniteCheckedException {
+    @Nullable @Override public V load(K key) throws CacheLoaderException {
+        assert key != null;
+
         init();
 
-        QueryCache type = entryQtyCache.get(typeKey(key));
+        EntryMapping type = typeMeta.get(typeId(key));
 
         if (type == null)
-            throw new IgniteCheckedException("Failed to find mapping description for type: " + key.getClass());
+            throw new CacheLoaderException("Failed to find store mapping description for key: " + key);
 
         if (log.isDebugEnabled())
-            log.debug("Start load value from db by key: " + key);
+            log.debug("Start load value from database by key: " + key);
 
         Connection conn = null;
 
         PreparedStatement stmt = null;
 
         try {
-            conn = connection(tx);
+            conn = connection();
 
             stmt = conn.prepareStatement(type.loadQrySingle);
 
@@ -634,143 +548,108 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
                 return buildObject(type.valueType(), type.valueDescriptors(), rs);
         }
         catch (SQLException e) {
-            throw new IgniteCheckedException("Failed to load object by key: " + key, e);
+            throw new CacheLoaderException("Failed to load object by key: " + key, e);
         }
         finally {
-            end(tx, conn, stmt);
+            end(conn, stmt);
         }
 
         return null;
     }
 
-    /**
-     * Loads all values for given keys with same type and passes every value to the provided closure.
-     *
-     * @param tx Cache transaction, if write-behind is not enabled, null otherwise.
-     * @param qry Query cache for type.
-     * @param keys Collection of keys to load.
-     * @param c Closure to call for every loaded element.
-     * @throws IgniteCheckedException If load failed.
-     */
-    protected void loadAll(@Nullable IgniteTx tx, QueryCache qry, Collection<? extends K> keys,
-        IgniteBiInClosure<K, V> c) throws IgniteCheckedException {
+    /** {@inheritDoc} */
+    @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException {
+        assert keys != null;
+
         init();
 
         Connection conn = null;
-
-        PreparedStatement stmt = null;
-
         try {
-            conn = connection(tx);
-
-            stmt = conn.prepareStatement(qry.loadQuery(keys.size()));
+            conn = connection();
 
-            int i = 1;
+            Map<Object, LoadWorker<K, V>> workers = U.newHashMap(typeMeta.size());
 
-            for (K key : keys) {
-                for (GridCacheQueryTypeDescriptor field : qry.keyDescriptors()) {
-                    Object fieldVal = extractField(qry.keyType(), field.getJavaName(), key);
+            Collection<Future<Map<K, V>>> futs = new ArrayList<>();
 
-                    if (fieldVal != null)
-                        stmt.setObject(i++, fieldVal);
-                    else
-                        stmt.setNull(i++, field.getDbType());
-                }
-            }
-
-            ResultSet rs = stmt.executeQuery();
-
-            while (rs.next()) {
-                K key = buildObject(qry.keyType(), qry.keyDescriptors(), rs);
-                V val = buildObject(qry.valueType(), qry.valueDescriptors(), rs);
+            int cnt = 0;
 
-                c.apply(key, val);
-            }
-        }
-        catch (SQLException e) {
-            throw new IgniteCheckedException("Failed to load objects", e);
-        }
-        finally {
-            end(tx, conn, stmt);
-        }
-    }
+            for (K key : keys) {
+                Object typeId = typeId(key);
 
-    /** {@inheritDoc} */
-    public void loadAll(@Nullable final IgniteTx tx, Collection<? extends K> keys,
-        final IgniteBiInClosure<K, V> c) throws IgniteCheckedException {
-        assert keys != null;
+                final EntryMapping m = typeMeta.get(typeId);
 
-        init();
+                if (m == null)
+                    throw new CacheWriterException("Failed to find store mapping description for key: " + key);
 
-        Map<QueryCache, Collection<K>> splittedKeys = U.newHashMap(entryQtyCache.size());
+                LoadWorker<K, V> worker = workers.get(typeId);
 
-        final Collection<Future<?>> futs = new ArrayList<>();
+                if (worker == null)
+                    workers.put(typeId, worker = new LoadWorker<>(conn, m));
 
-        for (K key : keys) {
-            final QueryCache qry = entryQtyCache.get(typeKey(key));
+                worker.keys.add(key);
 
-            Collection<K> batch = splittedKeys.get(qry);
+                if (worker.keys.size() == m.maxKeysPerStmt)
+                    futs.add(exec.submit(workers.remove(typeId)));
 
-            if (batch == null)
-                splittedKeys.put(qry, batch = new ArrayList<>());
+                cnt ++;
+            }
 
-            batch.add(key);
+            for (LoadWorker<K, V> worker : workers.values())
+                futs.add(exec.submit(worker));
 
-            if (batch.size() == qry.maxKeysPerStmt) {
-                final Collection<K> p = splittedKeys.remove(qry);
+            Map<K, V> res = U.newHashMap(cnt);
 
-                futs.add(exec.submit(new Callable<Void>() {
-                    @Override public Void call() throws Exception {
-                        loadAll(tx, qry, p, c);
+            for (Future<Map<K, V>> fut : futs)
+                res.putAll(U.get(fut));
 
-                        return null;
-                    }
-                }));
-            }
+            return res;
+        }
+        catch (SQLException e) {
+            throw new CacheWriterException("Failed to open connection", e);
+        }
+        catch (IgniteCheckedException e) {
+            throw new CacheWriterException("Failed to load entries from database", e);
+        }
+        finally {
+            closeConnection(conn);
         }
-
-        for (final Map.Entry<QueryCache, Collection<K>> entry : splittedKeys.entrySet())
-            futs.add(exec.submit(new Callable<Void>() {
-                @Override public Void call() throws Exception {
-                    loadAll(tx, entry.getKey(), entry.getValue(), c);
-
-                    return null;
-                }
-            }));
-
-        for (Future<?> fut : futs)
-            U.get(fut);
     }
 
     /** {@inheritDoc} */
-    public void put(@Nullable IgniteTx tx, K key, V val) throws IgniteCheckedException {
+    @Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
+        assert entry != null;
+
         init();
 
-        QueryCache type = entryQtyCache.get(typeKey(key));
+        K key = entry.getKey();
+
+        EntryMapping type = typeMeta.get(typeId(key));
 
         if (type == null)
-            throw new IgniteCheckedException("Failed to find metadata for type: " + key.getClass());
+            throw new CacheWriterException("Failed to find store mapping description for entry: " + entry);
 
         if (log.isDebugEnabled())
-            log.debug("Start put value in db: (" + key + ", " + val);
+            log.debug("Start write entry to database: " + entry);
 
         Connection conn = null;
 
         PreparedStatement stmt = null;
 
         try {
-            conn = connection(tx);
+            conn = connection();
 
             if (dialect.hasMerge()) {
                 stmt = conn.prepareStatement(type.mergeQry);
 
                 int i = fillKeyParameters(stmt, type, key);
 
-                fillValueParameters(stmt, i, type, val);
+                fillValueParameters(stmt, i, type, entry.getValue());
 
                 stmt.executeUpdate();
             }
             else {
+                V val = entry.getValue();
+
                 stmt = conn.prepareStatement(type.updQry);
 
                 int i = fillValueParameters(stmt, 1, type, val);
@@ -791,122 +670,133 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
             }
         }
         catch (SQLException e) {
-            throw new IgniteCheckedException("Failed to put object by key: " + key, e);
+            throw new CacheWriterException("Failed to write entry to database: " + entry, e);
         }
         finally {
-            end(tx, conn, stmt);
+            end(conn, stmt);
         }
     }
 
-    /**
-     * Stores given key value pairs in persistent storage.
-     *
-     * @param tx Cache transaction, if write-behind is not enabled, null otherwise.
-     * @param qry Query cache for type.
-     * @param map Values to store.
-     * @throws IgniteCheckedException If store failed.
-     */
     /** {@inheritDoc} */
-    protected void putAll(@Nullable IgniteTx tx, QueryCache qry, Iterable<Map.Entry<? extends K, ? extends V>> map)
-        throws IgniteCheckedException {
-        assert map != null;
+    @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries)
+        throws CacheWriterException {
+        assert entries != null;
 
         init();
 
         Connection conn = null;
 
-        PreparedStatement stmt = null;
-
         try {
-            conn = connection(tx);
+            conn = connection();
 
-            stmt = conn.prepareStatement(qry.mergeQry);
+            if (dialect.hasMerge()) {
+                Map<Object, WriteWorker> workers = U.newHashMap(typeMeta.size());
 
-            int cnt = 0;
+                Collection<Future<?>> futs = new ArrayList<>();
 
-            for (Map.Entry<? extends K, ? extends V> entry : map) {
-                int i = fillKeyParameters(stmt, qry, entry.getKey());
+                for (Cache.Entry<? extends K, ? extends V> entry : entries) {
+                    Object typeId = typeId(entry.getKey());
 
-                fillValueParameters(stmt, i, qry, entry.getValue());
+                    final EntryMapping m = typeMeta.get(typeId);
 
-                stmt.addBatch();
+                    if (m == null)
+                        throw new CacheWriterException("Failed to find store mapping description for key: " +
+                            entry.getKey());
 
-                if (cnt++ % batchSz == 0)
-                    stmt.executeBatch();
+                    WriteWorker worker = workers.get(typeId);
+
+                    if (worker == null)
+                        workers.put(typeId, worker = new WriteWorker(conn, m));
+
+                    worker.entries.add(entry);
+
+                    if (worker.entries.size() == batchSz)
+                        futs.add(exec.submit(workers.remove(typeId)));
+                }
+
+                for (WriteWorker worker : workers.values())
+                    futs.add(exec.submit(worker));
+
+                for (Future<?> fut : futs)
+                    U.get(fut);
             }
+            else {
+                Map<Object, T2<PreparedStatement, PreparedStatement>> stmtByType = U.newHashMap(typeMeta.size());
 
-            if (cnt % batchSz != 0)
-                stmt.executeBatch();
-        }
-        catch (SQLException e) {
-            throw new IgniteCheckedException("Failed to put objects", e);
-        }
-        finally {
-            end(tx, conn, stmt);
-        }
-    }
+                for (Cache.Entry<? extends K, ? extends V> entry : entries) {
+                    Object typeId = typeId(entry.getKey());
 
-    /** {@inheritDoc} */
-    public void putAll(@Nullable final IgniteTx tx, Map<? extends K, ? extends V> map)
-        throws IgniteCheckedException {
-        assert map != null;
+                    final EntryMapping m = typeMeta.get(typeId);
 
-        init();
+                    if (m == null)
+                        throw new CacheWriterException("Failed to find store mapping description for key: " +
+                            entry.getKey());
 
-        Map<Object, Collection<Map.Entry<? extends K, ? extends V>>> keyByType = U.newHashMap(entryQtyCache.size());
+                    T2<PreparedStatement, PreparedStatement> stmts = stmtByType.get(typeId);
 
-        if (dialect.hasMerge()) {
-            for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
-                Object typeKey = typeKey(entry.getKey());
+                    if (stmts == null)
+                        stmtByType.put(typeId,
+                            stmts = new T2<>(conn.prepareStatement(m.updQry), conn.prepareStatement(m.insQry)));
 
-                Collection<Map.Entry<? extends K, ? extends V>> batch = keyByType.get(typeKey);
+                    PreparedStatement stmt = stmts.get1();
 
-                if (batch == null)
-                    keyByType.put(typeKey, batch = new ArrayList<>());
+                    assert stmt != null;
 
-                batch.add(entry);
-            }
+                    int i = fillValueParameters(stmt, 1, m, entry.getValue());
 
-            final Collection<Future<?>> futs = new ArrayList<>();
+                    fillKeyParameters(stmt, i, m, entry.getKey());
 
-            for (final Map.Entry<Object, Collection<Map.Entry<? extends K, ? extends V>>> e : keyByType.entrySet()) {
-                final QueryCache qry = entryQtyCache.get(e.getKey());
+                    if (stmt.executeUpdate() == 0) {
+                        stmt = stmts.get2();
 
-                futs.add(exec.submit(new Callable<Void>() {
-                    @Override public Void call() throws Exception {
-                        putAll(tx, qry, e.getValue());
+                        assert stmt != null;
+
+                        i = fillKeyParameters(stmt, m, entry.getKey());
+
+                        fillValueParameters(stmt, i, m, entry.getValue());
 
-                        return null;
+                        stmt.executeUpdate();
                     }
-                }));
-            }
+                }
 
-            for (Future<?> fut : futs)
-                U.get(fut);
+                for (T2<PreparedStatement, PreparedStatement> stmts :  stmtByType.values()) {
+                    U.closeQuiet(stmts.get1());
+
+                    U.closeQuiet(stmts.get2());
+                }
+            }
+        }
+        catch (SQLException e) {
+            throw new CacheWriterException("Failed to open connection", e);
+        }
+        catch (IgniteCheckedException e) {
+            throw new CacheWriterException("Failed to write values into database", e);
+        }
+        finally {
+            closeConnection(conn);
         }
-        else
-            for (Map.Entry<? extends K, ? extends V> e : map.entrySet())
-                put(tx, e.getKey(), e.getValue());
     }
 
     /** {@inheritDoc} */
-    public void remove(@Nullable IgniteTx tx, K key) throws IgniteCheckedException {
+    @Override public void delete(Object key) throws CacheWriterException {
+        assert key != null;
+
         init();
 
-        QueryCache type = entryQtyCache.get(typeKey(key));
+        EntryMapping type = typeMeta.get(typeId(key));
 
         if (type == null)
-            throw new IgniteCheckedException("Failed to find metadata for type: " + key.getClass());
+            throw new CacheWriterException("Failed to find store mapping description for key: " + key);
 
         if (log.isDebugEnabled())
-            log.debug("Start remove value from db by key: " + key);
+            log.debug("Start remove value from database by key: " + key);
 
         Connection conn = null;
 
         PreparedStatement stmt = null;
 
         try {
-            conn = connection(tx);
+            conn = connection();
 
             stmt = conn.prepareStatement(type.remQry);
 
@@ -915,83 +805,136 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
             stmt.executeUpdate();
         }
         catch (SQLException e) {
-            throw new IgniteCheckedException("Failed to load object by key: " + key, e);
+            throw new CacheWriterException("Failed to remove value from database by key: " + key, e);
         }
         finally {
-            end(tx, conn, stmt);
+            end(conn, stmt);
         }
     }
 
-    /**
-     * Removes all vales identified by given keys from persistent storage.
-     *
-     * @param tx Cache transaction, if write-behind is not enabled, null otherwise.
-     * @param qry Query cache for type.
-     * @param keys Collection of keys to remove.
-     * @throws IgniteCheckedException If remove failed.
-     */
-    protected void removeAll(@Nullable IgniteTx tx, QueryCache qry, Collection<? extends K> keys)
-        throws IgniteCheckedException {
-        assert keys != null && !keys.isEmpty();
+    /** {@inheritDoc} */
+    @Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
+        assert keys != null;
 
-        init();
+        Connection conn = null;
 
-        if (log.isDebugEnabled())
-            log.debug("Start remove values by keys: " + Arrays.toString(keys.toArray()));
+        try {
+            conn = connection();
 
-        Connection conn = null;
+            Collection<Future<?>> futs = new ArrayList<>();
 
-        PreparedStatement stmt = null;
+            Map<Object, DeleteWorker> workers = U.newHashMap(typeMeta.size());
 
-        try {
-            conn = connection(tx);
+            for (Object key : keys) {
+                Object typeId = typeId(key);
 
-            stmt = conn.prepareStatement(qry.remQry);
+                final EntryMapping m = typeMeta.get(typeId);
 
-            int cnt = 0;
+                if (m == null)
+                    throw new CacheWriterException("Failed to find store mapping description for key: " + key);
 
-            for (K key : keys) {
-                fillKeyParameters(stmt, qry, key);
+                DeleteWorker worker = workers.get(typeId);
+
+                if (worker == null)
+                    workers.put(typeId, worker = new DeleteWorker(conn, m));
 
-                stmt.addBatch();
+                worker.keys.add(key);
 
-                if (cnt++ % batchSz == 0)
-                    stmt.executeBatch();
+                if (worker.keys.size() == batchSz)
+                    futs.add(exec.submit(workers.remove(typeId)));
             }
 
-            if (cnt % batchSz != 0)
-                stmt.executeBatch();
+            for (DeleteWorker worker : workers.values())
+                futs.add(exec.submit(worker));
+
+            for (Future<?> fut : futs)
+                U.get(fut);
         }
         catch (SQLException e) {
-            throw new IgniteCheckedException("Failed to remove values by keys.", e);
+            throw new CacheWriterException("Failed to open connection", e);
+        }
+        catch (IgniteCheckedException e) {
+            throw new CacheWriterException("Failed to remove values from database", e);
         }
         finally {
-            end(tx, conn, stmt);
+            closeConnection(conn);
         }
     }
 
-    /** {@inheritDoc} */
-    public void removeAll(@Nullable IgniteTx tx, Collection<? extends K> keys) throws IgniteCheckedException {
-        assert keys != null;
-
-        Map<Object, Collection<K>> keyByType = U.newHashMap(entryQtyCache.size());
+    /**
+     * Extract database column names from {@link GridCacheQueryTypeDescriptor}.
+     *
+     * @param dsc collection of {@link GridCacheQueryTypeDescriptor}.
+     */
+    protected static Collection<String> databaseColumns(Collection<GridCacheQueryTypeDescriptor> dsc) {
+        return F.transform(dsc, new C1<GridCacheQueryTypeDescriptor, String>() {
+            /** {@inheritDoc} */
+            @Override public String apply(GridCacheQueryTypeDescriptor desc) {
+                return desc.getDbName();
+            }
+        });
+    }
 
-        for (K key : keys) {
-            Object typeKey = typeKey(key);
+    /**
+     * @param stmt Prepare statement.
+     * @param i Start index for parameters.
+     * @param type Type description.
+     * @param key Key object.
+     * @return Next index for parameters.
+     */
+    protected int fillKeyParameters(PreparedStatement stmt, int i, EntryMapping type,
+        Object key) throws CacheException {
+        for (GridCacheQueryTypeDescriptor field : type.keyDescriptors()) {
+            Object fieldVal = extractField(type.keyType(), field.getJavaName(), key);
 
-            Collection<K> batch = keyByType.get(typeKey);
+            try {
+                if (fieldVal != null)
+                    stmt.setObject(i++, fieldVal);
+                else
+                    stmt.setNull(i++, field.getDbType());
+            }
+            catch (SQLException e) {
+                throw new CacheException("Failed to set statement parameter name: " + field.getDbName(), e);
+            }
+        }
 
-            if (batch == null)
-                keyByType.put(typeKey, batch = new ArrayList<>());
+        return i;
+    }
 
-            batch.add(key);
-        }
+    /**
+     * @param stmt Prepare statement.
+     * @param type Type description.
+     * @param key Key object.
+     * @return Next index for parameters.
+     */
+    protected int fillKeyParameters(PreparedStatement stmt, EntryMapping type, Object key) throws CacheException {
+        return fillKeyParameters(stmt, 1, type, key);
+    }
 
-        for (Map.Entry<Object, Collection<K>> entry : keyByType.entrySet()) {
-            QueryCache qry = entryQtyCache.get(entry.getKey());
+    /**
+     * @param stmt Prepare statement.
+     * @param i Start index for parameters.
+     * @param type Type description.
+     * @param val Value object.
+     * @return Next index for parameters.
+     */
+    protected int fillValueParameters(PreparedStatement stmt, int i, EntryMapping type, Object val)
+        throws CacheWriterException {
+        for (GridCacheQueryTypeDescriptor field : type.uniqValFields) {
+            Object fieldVal = extractField(type.valueType(), field.getJavaName(), val);
 
-            removeAll(tx, qry, entry.getValue());
+            try {
+                if (fieldVal != null)
+                    stmt.setObject(i++, fieldVal);
+                else
+                    stmt.setNull(i++, field.getDbType());
+            }
+            catch (SQLException e) {
+                throw new CacheWriterException("Failed to set statement parameter name: " + field.getDbName(), e);
+            }
         }
+
+        return i;
     }
 
     /**
@@ -1051,22 +994,6 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
     }
 
     /**
-     * @return Paths to xml with type mapping description.
-     */
-    public Collection<String> getTypeMetadataPaths() {
-        return typeMetadataPaths;
-    }
-
-    /**
-     * Set paths to xml with type mapping description.
-     *
-     * @param typeMetadataPaths Paths to xml.
-     */
-    public void setTypeMetadataPaths(Collection<String> typeMetadataPaths) {
-        this.typeMetadataPaths = typeMetadataPaths;
-    }
-
-    /**
      * Set type mapping description.
      *
      * @param typeMetadata Type mapping description.
@@ -1080,7 +1007,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
      *
      * @return Database dialect.
      */
-    public BasicJdbcDialect getDialect() {
+    public JdbcDialect getDialect() {
         return dialect;
     }
 
@@ -1089,7 +1016,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
      *
      * @param dialect Database dialect.
      */
-    public void setDialect(BasicJdbcDialect dialect) {
+    public void setDialect(JdbcDialect dialect) {
         this.dialect = dialect;
     }
 
@@ -1129,32 +1056,128 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> {
         this.batchSz = batchSz;
     }
 
-    @Override public void txEnd(boolean commit) throws CacheWriterException {
-        // TODO: SPRINT-32 CODE: implement.
-    }
+    private class LoadWorker<K1, V1> implements Callable<Map<K1, V1>> {
+        private final Connection conn;
 
-    @Override public V load(K k) throws CacheLoaderException {
-        return null; // TODO: SPRINT-32 CODE: implement.
-    }
+        private final Collection<K1> keys;
 
-    @Override public Map<K, V> loadAll(Iterable<? extends K> iterable) throws CacheLoaderException {
-        return null; // TODO: SPRINT-32 CODE: implement.
-    }
+        private final EntryMapping m;
 
-    @Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
-        // TODO: SPRINT-32 CODE: implement.
-    }
+        private LoadWorker(Connection conn, EntryMapping m) {
+            this.conn = conn;
+            keys = new ArrayList<>(batchSz);
+            this.m = m;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<K1, V1> call() throws Exception {
+            PreparedStatement stmt = null;
+
+            try {
+                stmt = conn.prepareStatement(m.loadQuery(keys.size()));
+
+                int i = 1;
 
-    @Override
-    public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> collection) throws CacheWriterException {
-        // TODO: SPRINT-32 CODE: implement.
+                for (Object key : keys)
+                    for (GridCacheQueryTypeDescriptor field : m.keyDescriptors()) {
+                        Object fieldVal = extractField(m.keyType(), field.getJavaName(), key);
+
+                        if (fieldVal != null)
+                            stmt.setObject(i++, fieldVal);
+                        else
+                            stmt.setNull(i++, field.getDbType());
+                    }
+
+                ResultSet rs = stmt.executeQuery();
+
+                Map<K1, V1> entries = U.newHashMap(keys.size());
+
+                while (rs.next()) {
+                    K1 key = buildObject(m.keyType(), m.keyDescriptors(), rs);
+                    V1 val = buildObject(m.valueType(), m.valueDescriptors(), rs);
+
+                    entries.put(key, val);
+                }
+
+                return entries;
+            }
+            finally {
+                U.closeQuiet(stmt);
+            }
+        }
     }
 
-    @Override public void delete(Object o) throws CacheWriterException {
-        // TODO: SPRINT-32 CODE: implement.
+    private class WriteWorker implements Callable<Void> {
+        private final Connection conn;
+
+        private final Collection<Cache.Entry<?, ?>> entries;
+
+        private final EntryMapping m;
+
+        private WriteWorker(Connection conn, EntryMapping m) {
+            this.conn = conn;
+            entries = new ArrayList<>(batchSz);
+            this.m = m;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() throws Exception {
+            PreparedStatement stmt = null;
+
+            try {
+                stmt = conn.prepareStatement(m.mergeQry);
+
+                for (Cache.Entry<?, ?> entry : entries) {
+                    int i = fillKeyParameters(stmt, m, entry.getKey());
+
+                    fillValueParameters(stmt, i, m, entry.getValue());
+
+                    stmt.addBatch();
+                }
+
+                stmt.executeBatch();
+            }
+            finally {
+                U.closeQuiet(stmt);
+            }
+
+            return null;
+        }
     }
 
-    @Override public void deleteAll(Collection<?> collection) throws CacheWriterException {
-        // TODO: SPRINT-32 CODE: implement.
+    private class DeleteWorker implements Callable<Void> {
+        private final Connection conn;
+
+        private final Collection<Object> keys;
+
+        private final EntryMapping m;
+
+        private DeleteWorker(Connection conn, EntryMapping m) {
+            this.conn = conn;
+            keys = new ArrayList<>(batchSz);
+            this.m = m;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() throws Exception {
+            PreparedStatement stmt = null;
+
+            try {
+                stmt = conn.prepareStatement(m.remQry);
+
+                for (Object key : keys) {
+                    fillKeyParameters(stmt, m, key);
+
+                    stmt.addBatch();
+                }
+
+                stmt.executeBatch();
+            }
+            finally {
+                U.closeQuiet(stmt);
+            }
+
+            return null;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9803b6a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
index 4d9953a..16a67d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
@@ -17,12 +17,13 @@
 
 package org.apache.ignite.cache.store.jdbc;
 
-import org.apache.ignite.*;
 import org.apache.ignite.cache.store.*;
 import org.gridgain.grid.cache.query.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.*;
+import javax.cache.integration.*;
 import java.lang.reflect.*;
 import java.sql.*;
 import java.util.*;
@@ -55,8 +56,7 @@ public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object> {
          * @param clsName Class name.
          * @param fields Fields.
          */
-        public PojoMethodsCache(String clsName,
-            Collection<GridCacheQueryTypeDescriptor> fields) throws IgniteCheckedException {
+        public PojoMethodsCache(String clsName, Collection<GridCacheQueryTypeDescriptor> fields) throws CacheException {
 
             try {
                 cls = Class.forName(clsName);
@@ -67,10 +67,10 @@ public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object> {
                     ctor.setAccessible(true);
             }
             catch (ClassNotFoundException e) {
-                throw new IgniteCheckedException("Failed to find class: " + clsName, e);
+                throw new CacheException("Failed to find class: " + clsName, e);
             }
             catch (NoSuchMethodException e) {
-                throw new IgniteCheckedException("Failed to find empty constructor for class: " + clsName, e);
+                throw new CacheException("Failed to find empty constructor for class: " + clsName, e);
             }
 
             setters = U.newHashMap(fields.size());
@@ -88,7 +88,7 @@ public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object> {
                         getters.put(field.getJavaName(), cls.getMethod("is" + prop));
                     }
                     catch (NoSuchMethodException e) {
-                        throw new IgniteCheckedException("Failed to find getter for property " + field.getJavaName() +
+                        throw new CacheException("Failed to find getter for property " + field.getJavaName() +
                             " of class: " + cls.getName(), e);
                     }
                 }
@@ -97,7 +97,7 @@ public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object> {
                     setters.put(field.getJavaName(), cls.getMethod("set" + prop, field.getJavaType()));
                 }
                 catch (NoSuchMethodException e) {
-                    throw new IgniteCheckedException("Failed to find setter for property " + field.getJavaName() +
+                    throw new CacheException("Failed to find setter for property " + field.getJavaName() +
                         " of class: " + clsName, e);
                 }
             }
@@ -118,14 +118,14 @@ public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object> {
          * Construct new instance of pojo object.
          *
          * @return pojo object.
-         * @throws IgniteCheckedException If construct new instance failed.
+         * @throws CacheLoaderException If construct new instance failed.
          */
-        protected Object newInstance() throws IgniteCheckedException {
+        protected Object newInstance() throws CacheLoaderException {
             try {
                 return ctor.newInstance();
             }
             catch (Exception e) {
-                throw new IgniteCheckedException("Failed to create new instance for class: " + cls, e);
+                throw new CacheLoaderException("Failed to create new instance for class: " + cls, e);
             }
         }
     }
@@ -134,8 +134,8 @@ public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object> {
     protected Map<String, PojoMethodsCache> mtdsCache;
 
     /** {@inheritDoc} */
-    @Override protected void buildTypeCache() throws IgniteCheckedException {
-        entryQtyCache = U.newHashMap(typeMetadata.size());
+    @Override protected void buildTypeCache() throws CacheException {
+        typeMeta = U.newHashMap(typeMetadata.size());
 
         mtdsCache = U.newHashMap(typeMetadata.size() * 2);
 
@@ -144,15 +144,19 @@ public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object> {
 
             mtdsCache.put(type.getKeyType(), keyCache);
 
-            entryQtyCache.put(keyCache.cls, new QueryCache(dialect, type));
+            typeMeta.put(keyCache.cls, new EntryMapping(dialect, type));
 
             mtdsCache.put(type.getType(), new PojoMethodsCache(type.getType(), type.getValueDescriptors()));
         }
+
+        typeMeta = Collections.unmodifiableMap(typeMeta);
+
+        mtdsCache = Collections.unmodifiableMap(mtdsCache);
     }
 
     /** {@inheritDoc} */
     @Override protected <R> R buildObject(String typeName, Collection<GridCacheQueryTypeDescriptor> fields,
-        ResultSet rs) throws IgniteCheckedException {
+        ResultSet rs) throws CacheLoaderException {
         PojoMethodsCache t = mtdsCache.get(typeName);
 
         Object obj = t.newInstance();
@@ -164,25 +168,28 @@ public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object> {
             return (R)obj;
         }
         catch (Exception e) {
-            throw new IgniteCheckedException("Failed to read object of class: " + typeName, e);
+            throw new CacheLoaderException("Failed to read object of class: " + typeName, e);
         }
     }
 
     /** {@inheritDoc} */
     @Nullable @Override protected Object extractField(String typeName, String fieldName, Object obj)
-        throws IgniteCheckedException {
+        throws CacheException {
         try {
             PojoMethodsCache mc = mtdsCache.get(typeName);
 
             return mc.getters.get(fieldName).invoke(obj);
         }
         catch (Exception e) {
-            throw new IgniteCheckedException("Failed to read object of class: " + typeName, e);
+            throw new CacheException("Failed to read object of class: " + typeName, e);
         }
     }
 
     /** {@inheritDoc} */
-    @Override protected Object typeKey(Object key) {
-        return key.getClass();
+    @Override protected Object typeId(Object key) throws CacheException {
+        if (key != null)
+            return key.getClass();
+
+        throw new CacheException();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9803b6a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
index e6b4367..03e6f8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
@@ -25,7 +25,7 @@ import java.util.*;
 /**
  * Represents a dialect of SQL implemented by a particular RDBMS.
  */
-public class BasicJdbcDialect {
+public class BasicJdbcDialect implements JdbcDialect {
     /** Default max query parameters count. */
     protected static final int DFLT_MAX_PARAMS_CNT = 2000;
 
@@ -145,29 +145,13 @@ public class BasicJdbcDialect {
         return sb.toString();
     }
 
-    /**
-     * Construct load cache query.
-     *
-     * @param schema Database schema name.
-     * @param tblName Database table name.
-     * @param uniqCols Database unique value columns.
-     * @return Load cache query.
-     */
-    public String loadCacheQuery(String schema, String tblName, Iterable<String> uniqCols) {
+    /** {@inheritDoc} */
+    @Override public String loadCacheQuery(String schema, String tblName, Iterable<String> uniqCols) {
         return String.format("SELECT %s FROM %s.%s", mkString(uniqCols, ","), schema, tblName);
     }
 
-    /**
-     * Construct load query.
-     *
-     * @param schema Database schema name.
-     * @param tblName Database table name.
-     * @param keyCols Database key columns.
-     * @param cols Selected columns.
-     * @param keyCnt Key count.
-     * @return Load query.
-     */
-    public String loadQuery(String schema, String tblName, Collection<String> keyCols, Iterable<String> cols,
+    /** {@inheritDoc} */
+    @Override public String loadQuery(String schema, String tblName, Collection<String> keyCols, Iterable<String> cols,
         int keyCnt) {
         assert !keyCols.isEmpty();
 
@@ -176,30 +160,18 @@ public class BasicJdbcDialect {
         return String.format("SELECT %s FROM %s.%s WHERE %s", mkString(cols, ","), schema, tblName, params);
     }
 
-    /**
-     * Construct insert query.
-     *
-     * @param schema Database schema name.
-     * @param tblName Database table name.
-     * @param keyCols Database key columns.
-     * @param valCols Database value columns.
-     */
-    public String insertQuery(String schema, String tblName, Collection<String> keyCols, Collection<String> valCols) {
+    /** {@inheritDoc} */
+    @Override public String insertQuery(String schema, String tblName, Collection<String> keyCols,
+        Collection<String> valCols) {
         Collection<String> cols = F.concat(false, keyCols, valCols);
 
         return String.format("INSERT INTO %s.%s(%s) VALUES(%s)", schema, tblName, mkString(cols, ","),
             repeat("?", cols.size(), "", ",", ""));
     }
 
-    /**
-     * Construct update query.
-     *
-     * @param schema Database schema name.
-     * @param tblName Database table name.
-     * @param keyCols Database key columns.
-     * @param valCols Database value columns.
-     */
-    public String updateQuery(String schema, String tblName, Collection<String> keyCols, Iterable<String> valCols) {
+    /** {@inheritDoc} */
+    @Override public String updateQuery(String schema, String tblName, Collection<String> keyCols,
+        Iterable<String> valCols) {
         String params = mkString(valCols, new C1<String, String>() {
             @Override public String apply(String s) {
                 return s + "=?";
@@ -209,35 +181,19 @@ public class BasicJdbcDialect {
         return String.format("UPDATE %s.%s SET %s WHERE %s", schema, tblName, params, where(keyCols, 1));
     }
 
-    /**
-     * @return {@code True} if database support merge operation.
-     */
-    public boolean hasMerge() {
+    /** {@inheritDoc} */
+    @Override public boolean hasMerge() {
         return false;
     }
 
-    /**
-     * Construct merge query.
-     *
-     * @param schema Database schema name.
-     * @param tblName Database table name.
-     * @param keyCols Database key columns.
-     * @param uniqCols Database unique value columns.
-     * @return Put query.
-     */
-    public String mergeQuery(String schema, String tblName, Collection<String> keyCols, Collection<String> uniqCols) {
+    /** {@inheritDoc} */
+    @Override public String mergeQuery(String schema, String tblName, Collection<String> keyCols,
+        Collection<String> uniqCols) {
         return "";
     }
 
-    /**
-     * Construct remove query.
-     *
-     * @param schema Database schema name.
-     * @param tblName Database table name.
-     * @param keyCols Database key columns.
-     * @return Remove query.
-     */
-    public String removeQuery(String schema, String tblName, Iterable<String> keyCols) {
+    /** {@inheritDoc} */
+    @Override public String removeQuery(String schema, String tblName, Iterable<String> keyCols) {
         String whereParams = mkString(keyCols, new C1<String, String>() {
             @Override public String apply(String s) {
                 return s + "=?";
@@ -247,12 +203,8 @@ public class BasicJdbcDialect {
         return String.format("DELETE FROM %s.%s WHERE %s", schema, tblName, whereParams);
     }
 
-    /**
-     * Get max query parameters count.
-     *
-     * @return Max query parameters count.
-     */
-    public int getMaxParamsCnt() {
+    /** {@inheritDoc} */
+    @Override public int getMaxParamsCnt() {
         return maxParamsCnt;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9803b6a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
new file mode 100644
index 0000000..e0d80e9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc.dialect;
+
+import java.util.*;
+
+/**
+ * Represents a dialect of SQL implemented by a particular RDBMS.
+ */
+public interface JdbcDialect {
+    /**
+     * Construct load cache query.
+     *
+     * @param schema Database schema name.
+     * @param tblName Database table name.
+     * @param uniqCols Database unique value columns.
+     * @return Load cache query.
+     */
+    public String loadCacheQuery(String schema, String tblName, Iterable<String> uniqCols);
+
+    /**
+     * Construct load query.
+     *
+     * @param schema Database schema name.
+     * @param tblName Database table name.
+     * @param keyCols Database key columns.
+     * @param cols Selected columns.
+     * @param keyCnt Key count.
+     * @return Load query.
+     */
+    public String loadQuery(String schema, String tblName, Collection<String> keyCols, Iterable<String> cols,
+        int keyCnt);
+
+    /**
+     * Construct insert query.
+     *
+     * @param schema Database schema name.
+     * @param tblName Database table name.
+     * @param keyCols Database key columns.
+     * @param valCols Database value columns.
+     */
+    public String insertQuery(String schema, String tblName, Collection<String> keyCols, Collection<String> valCols);
+
+    /**
+     * Construct update query.
+     *
+     * @param schema Database schema name.
+     * @param tblName Database table name.
+     * @param keyCols Database key columns.
+     * @param valCols Database value columns.
+     */
+    public String updateQuery(String schema, String tblName, Collection<String> keyCols, Iterable<String> valCols);
+
+    /**
+     * @return {@code True} if database support merge operation.
+     */
+    public boolean hasMerge();
+
+    /**
+     * Construct merge query.
+     *
+     * @param schema Database schema name.
+     * @param tblName Database table name.
+     * @param keyCols Database key columns.
+     * @param uniqCols Database unique value columns.
+     * @return Put query.
+     */
+    public String mergeQuery(String schema, String tblName, Collection<String> keyCols, Collection<String> uniqCols);
+
+    /**
+     * Construct remove query.
+     *
+     * @param schema Database schema name.
+     * @param tblName Database table name.
+     * @param keyCols Database key columns.
+     * @return Remove query.
+     */
+    public String removeQuery(String schema, String tblName, Iterable<String> keyCols);
+
+    /**
+     * Get max query parameters count.
+     *
+     * @return Max query parameters count.
+     */
+    public int getMaxParamsCnt();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9803b6a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java
index f2688d3..e061d7a 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreMultitreadedSelfTest.java
@@ -29,6 +29,7 @@ import org.springframework.beans.factory.xml.*;
 import org.springframework.context.support.*;
 import org.springframework.core.io.*;
 
+import javax.cache.configuration.*;
 import java.io.*;
 import java.net.*;
 import java.util.*;
@@ -49,7 +50,7 @@ public class PojoCacheStoreMultitreadedSelfTest extends AbstractCacheStoreMultit
         UrlResource metaUrl;
 
         try {
-            metaUrl = new UrlResource(new File("modules/core/src/test/config/store/auto/all.xml").toURI().toURL());
+            metaUrl = new UrlResource(new File("modules/core/src/test/config/store/jdbc/all.xml").toURI().toURL());
         }
         catch (MalformedURLException e) {
             throw new IgniteCheckedException("Failed to resolve metadata path [err=" + e.getMessage() + ']', e);
@@ -97,7 +98,10 @@ public class PojoCacheStoreMultitreadedSelfTest extends AbstractCacheStoreMultit
         cc.setSwapEnabled(false);
         cc.setWriteBehindEnabled(false);
 
-        // TODO: IGNITE-32 FIXME cc.setStore(store);
+        cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
+        cc.setReadThrough(true);
+        cc.setWriteThrough(true);
+        cc.setLoadPreviousValue(true);
 
         c.setCacheConfiguration(cc);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9803b6a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java
index fa77385..83e95ea 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/PojoCacheStoreSelfTest.java
@@ -18,13 +18,17 @@
 package org.apache.ignite.cache.store.jdbc;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.cache.store.jdbc.model.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.transactions.*;
 import org.gridgain.grid.cache.query.*;
+import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.testframework.*;
 import org.gridgain.testframework.junits.cache.GridAbstractCacheStoreSelfTest.*;
+import org.gridgain.testframework.junits.cache.*;
 import org.gridgain.testframework.junits.common.*;
 import org.h2.jdbcx.*;
 import org.jetbrains.annotations.*;
@@ -33,6 +37,7 @@ import org.springframework.beans.factory.xml.*;
 import org.springframework.context.support.*;
 import org.springframework.core.io.*;
 
+import javax.cache.*;
 import java.io.*;
 import java.net.*;
 import java.sql.*;
@@ -54,6 +59,9 @@ public class PojoCacheStoreSelfTest extends GridCommonAbstractTest {
     protected static final int PERSON_CNT = 100000;
 
     /** */
+    protected TestThreadLocalCacheSession ses = new TestThreadLocalCacheSession();
+
+    /** */
     protected final JdbcPojoCacheStore store;
 
     /**
@@ -79,7 +87,7 @@ public class PojoCacheStoreSelfTest extends GridCommonAbstractTest {
         UrlResource metaUrl;
 
         try {
-            metaUrl = new UrlResource(new File("modules/core/src/test/config/store/auto/all.xml").toURI().toURL());
+            metaUrl = new UrlResource(new File("modules/core/src/test/config/store/jdbc/all.xml").toURI().toURL());
         }
         catch (MalformedURLException e) {
             throw new IgniteCheckedException("Failed to resolve metadata path [err=" + e.getMessage() + ']', e);
@@ -116,6 +124,8 @@ public class PojoCacheStoreSelfTest extends GridCommonAbstractTest {
      */
     protected void inject(JdbcCacheStore store) throws Exception {
         getTestResources().inject(store);
+
+        GridTestUtils.setFieldValue(store, CacheStore.class, "ses", ses);
     }
 
     /**
@@ -175,8 +185,8 @@ public class PojoCacheStoreSelfTest extends GridCommonAbstractTest {
         assertEquals(ORGANIZATION_CNT, orgKeys.size());
         assertEquals(PERSON_CNT, prnKeys.size());
 
-        store.removeAll(null, orgKeys);
-        store.removeAll(null, prnKeys);
+        store.deleteAll(orgKeys);
+        store.deleteAll(prnKeys);
 
         orgKeys.clear();
         prnKeys.clear();
@@ -194,31 +204,40 @@ public class PojoCacheStoreSelfTest extends GridCommonAbstractTest {
         // Create dummy transaction
         IgniteTx tx = new DummyTx();
 
+        ses.newSession(tx);
+
         OrganizationKey k1 = new OrganizationKey(1);
         Organization v1 = new Organization(1, "Name1", "City1");
 
         OrganizationKey k2 = new OrganizationKey(2);
         Organization v2 = new Organization(2, "Name2", "City2");
 
-        store.put(tx, k1, v1);
-        store.put(tx, k2, v2);
+        store.write(new CacheEntryImpl<>(k1, v1));
+        store.write(new CacheEntryImpl<>(k2, v2));
+
+        store.txEnd(true);
+
+        ses.newSession(null);
 
-        store.txEnd(tx, true);
+        assertEquals(v1, store.load(k1));
+        assertEquals(v2, store.load(k2));
 
-        assertEquals(v1, store.load(null, k1));
-        assertEquals(v2, store.load(null, k2));
+        ses.newSession(tx);
 
         OrganizationKey k3 = new OrganizationKey(3);
 
-        assertNull(store.load(tx, k3));
+        assertNull(store.load(k3));
 
-        store.remove(tx, k1);
+        store.delete(k1);
 
-        store.txEnd(tx, true);
+        store.txEnd(true);
 
-        assertNull(store.load(tx, k1));
-        assertEquals(v2, store.load(tx, k2));
-        assertNull(store.load(null, k3));
+        assertNull(store.load(k1));
+        assertEquals(v2, store.load(k2));
+
+        ses.newSession(null);
+
+        assertNull(store.load(k3));
     }
 
     /**
@@ -227,118 +246,140 @@ public class PojoCacheStoreSelfTest extends GridCommonAbstractTest {
     public void testRollback() throws IgniteCheckedException {
         IgniteTx tx = new DummyTx();
 
+        ses.newSession(tx);
+
         OrganizationKey k1 = new OrganizationKey(1);
         Organization v1 = new Organization(1, "Name1", "City1");
 
         // Put.
-        store.put(tx, k1, v1);
+        store.write(new CacheEntryImpl<>(k1, v1));
 
-        store.txEnd(tx, false); // Rollback.
+        store.txEnd(false); // Rollback.
 
         tx = new DummyTx();
 
-        assertNull(store.load(tx, k1));
+        ses.newSession(tx);
+
+        assertNull(store.load(k1));
 
         OrganizationKey k2 = new OrganizationKey(2);
         Organization v2 = new Organization(2, "Name2", "City2");
 
         // Put all.
-        assertNull(store.load(tx, k2));
+        assertNull(store.load(k2));
+
+        Collection<Cache.Entry<?, ?>> col = new ArrayList<>();
 
-        store.putAll(tx, Collections.singletonMap(k2, v2));
+        col.add(new CacheEntryImpl<>(k2, v2));
 
-        store.txEnd(tx, false); // Rollback.
+        store.writeAll(col);
+
+        store.txEnd(false); // Rollback.
 
         tx = new DummyTx();
 
-        assertNull(store.load(tx, k2));
+        ses.newSession(tx);
+
+        assertNull(store.load(k2));
 
         OrganizationKey k3 = new OrganizationKey(3);
         Organization v3 = new Organization(3, "Name3", "City3");
 
-        store.putAll(tx, Collections.singletonMap(k3, v3));
+        col = new ArrayList<>();
+
+        col.add(new CacheEntryImpl<>(k3, v3));
 
-        store.txEnd(tx, true); // Commit.
+        store.writeAll(col);
+
+        store.txEnd(true); // Commit.
 
         tx = new DummyTx();
 
-        assertEquals(v3, store.load(tx, k3));
+        ses.newSession(tx);
+
+        assertEquals(v3, store.load(k3));
 
         OrganizationKey k4 = new OrganizationKey(4);
         Organization v4 = new Organization(4, "Name4", "City4");
 
-        store.put(tx, k4, v4);
+        store.write(new CacheEntryImpl<>(k4, v4));
 
-        store.txEnd(tx, false); // Rollback.
+        store.txEnd(false); // Rollback.
 
         tx = new DummyTx();
 
-        assertNull(store.load(tx, k4));
+        ses.newSession(tx);
+
+        assertNull(store.load(k4));
 
-        assertEquals(v3, store.load(tx, k3));
+        assertEquals(v3, store.load(k3));
 
         // Remove.
-        store.remove(tx, k3);
+        store.delete(k3);
 
-        store.txEnd(tx, false); // Rollback.
+        store.txEnd(false); // Rollback.
 
         tx = new DummyTx();
 
-        assertEquals(v3, store.load(tx, k3));
+        ses.newSession(tx);
+
+        assertEquals(v3, store.load(k3));
 
         // Remove all.
-        store.removeAll(tx, Arrays.asList(k3));
+        store.deleteAll(Arrays.asList(k3));
 
-        store.txEnd(tx, false); // Rollback.
+        store.txEnd(false); // Rollback.
 
         tx = new DummyTx();
 
-        assertEquals(v3, store.load(tx, k3));
+        ses.newSession(tx);
+
+        assertEquals(v3, store.load(k3));
     }
 
     /**
-     * @throws IgniteCheckedException if failed.
      */
-    public void testAllOpsWithTXNoCommit() throws IgniteCheckedException {
+    public void testAllOpsWithTXNoCommit() {
         doTestAllOps(new DummyTx(), false);
     }
 
     /**
-     * @throws IgniteCheckedException if failed.
      */
-    public void testAllOpsWithTXCommit() throws IgniteCheckedException {
+    public void testAllOpsWithTXCommit() {
         doTestAllOps(new DummyTx(), true);
     }
 
     /**
-     * @throws IgniteCheckedException if failed.
      */
-    public void testAllOpsWithoutTX() throws IgniteCheckedException {
+    public void testAllOpsWithoutTX() {
         doTestAllOps(null, false);
     }
 
     /**
      * @param tx Transaction.
      * @param commit Commit.
-     * @throws IgniteCheckedException If failed.
      */
-    private void doTestAllOps(@Nullable IgniteTx tx, boolean commit) throws IgniteCheckedException {
+    private void doTestAllOps(@Nullable IgniteTx tx, boolean commit) {
         try {
+            ses.newSession(tx);
+
             final OrganizationKey k1 = new OrganizationKey(1);
             final Organization v1 = new Organization(1, "Name1", "City1");
 
-            store.put(tx, k1, v1);
+            store.write(new CacheEntryImpl<>(k1, v1));
 
             if (tx != null && commit) {
-                store.txEnd(tx, true);
+                store.txEnd(true);
 
                 tx = new DummyTx();
+
+                ses.newSession(tx);
             }
 
             if (tx == null || commit)
-                assertEquals(v1, store.load(tx, k1));
+                assertEquals(v1, store.load(k1));
 
-            Map<OrganizationKey, Organization> m = new HashMap<>();
+            Collection<Cache.Entry<?, ?>> col = new ArrayList<>();
 
             final OrganizationKey k2 = new OrganizationKey(2);
             final Organization v2 = new Organization(2, "Name2", "City2");
@@ -346,15 +387,17 @@ public class PojoCacheStoreSelfTest extends GridCommonAbstractTest {
             final OrganizationKey k3 = new OrganizationKey(3);
             final Organization v3 = new Organization(3, "Name3", "City3");
 
-            m.put(k2, v2);
-            m.put(k3, v3);
+            col.add(new CacheEntryImpl<>(k2, v2));
+            col.add(new CacheEntryImpl<>(k3, v3));
 
-            store.putAll(tx, m);
+            store.writeAll(col);
 
             if (tx != null && commit) {
-                store.txEnd(tx, true);
+                store.txEnd(true);
 
                 tx = new DummyTx();
+
+                ses.newSession(tx);
             }
 
             final AtomicInteger cntr = new AtomicInteger();
@@ -362,55 +405,62 @@ public class PojoCacheStoreSelfTest extends GridCommonAbstractTest {
             final OrganizationKey no_such_key = new OrganizationKey(4);
 
             if (tx == null || commit) {
-                store.loadAll(tx, Arrays.asList(k1, k2, k3, no_such_key), new CI2<Object, Object>() {
-                    @Override public void apply(Object o, Object o1) {
-                        if (k1.equals(o))
-                            assertEquals(v1, o1);
+                Map<Object, Object> loaded = store.loadAll(Arrays.asList(k1, k2, k3, no_such_key));
 
-                        if (k2.equals(o))
-                            assertEquals(v2, o1);
+                for (Map.Entry<Object, Object> e : loaded.entrySet()) {
+                    Object key = e.getKey();
+                    Object val = e.getValue();
 
-                        if (k3.equals(o))
-                            assertEquals(v3, o1);
+                    if (k1.equals(key))
+                        assertEquals(v1, val);
 
-                        if (no_such_key.equals(o))
-                            fail();
+                    if (k2.equals(key))
+                        assertEquals(v2, val);
 
-                        cntr.incrementAndGet();
-                    }
-                });
+                    if (k3.equals(key))
+                        assertEquals(v3, val);
+
+                    if (no_such_key.equals(key))
+                        fail();
+
+                    cntr.incrementAndGet();
+                }
 
                 assertEquals(3, cntr.get());
             }
 
-            store.removeAll(tx, Arrays.asList(k2, k3));
+            store.deleteAll(Arrays.asList(k2, k3));
 
             if (tx != null && commit) {
-                store.txEnd(tx, true);
+                store.txEnd(true);
 
                 tx = new DummyTx();
+
+                ses.newSession(tx);
             }
 
             if (tx == null || commit) {
-                assertNull(store.load(tx, k2));
-                assertNull(store.load(tx, k3));
-                assertEquals(v1, store.load(tx, k1));
+                assertNull(store.load(k2));
+                assertNull(store.load(k3));
+                assertEquals(v1, store.load(k1));
             }
 
-            store.remove(tx, k1);
+            store.delete(k1);
 
             if (tx != null && commit) {
-                store.txEnd(tx, true);
+                store.txEnd(true);
 
                 tx = new DummyTx();
+
+                ses.newSession(tx);
             }
 
             if (tx == null || commit)
-                assertNull(store.load(tx, k1));
+                assertNull(store.load(k1));
         }
         finally {
             if (tx != null)
-                store.txEnd(tx, false);
+                store.txEnd(false);
         }
     }
 
@@ -427,6 +477,8 @@ public class PojoCacheStoreSelfTest extends GridCommonAbstractTest {
                 for (int i = 0; i < 1000; i++) {
                     IgniteTx tx = rnd.nextBoolean() ? new DummyTx() : null;
 
+                    ses.newSession(tx);
+
                     int op = rnd.nextInt(10);
 
                     boolean queueEmpty = false;
@@ -438,29 +490,22 @@ public class PojoCacheStoreSelfTest extends GridCommonAbstractTest {
                             queueEmpty = true;
                         else {
                             if (rnd.nextBoolean())
-                                assertNotNull(store.load(tx, key));
+                                assertNotNull(store.load(key));
                             else {
-                                final AtomicInteger cntr = new AtomicInteger();
-
-                                store.loadAll(tx, Collections.singleton(key), new CI2<Object, Object>() {
-                                    @Override public void apply(Object o, Object o1) {
-                                        cntr.incrementAndGet();
+                                Map<Object, Object> loaded = store.loadAll(Collections.singleton(key));
 
-                                        assertNotNull(o);
-                                        assertNotNull(o1);
+                                assertEquals(1, loaded.size());
 
-                                        OrganizationKey key = (OrganizationKey)o;
-                                        Organization val = (Organization)o1;
+                                Map.Entry<Object, Object> e = loaded.entrySet().iterator().next();
 
-                                        assertTrue(key.getId().equals(val.getId()));
-                                    }
-                                });
+                                OrganizationKey k = (OrganizationKey)e.getKey();
+                                Organization v = (Organization)e.getValue();
 
-                                assertEquals(1, cntr.get());
+                                assertTrue(k.getId().equals(v.getId()));
                             }
 
                             if (tx != null)
-                                store.txEnd(tx, true);
+                                store.txEnd(true);
 
                             queue.add(key);
                         }
@@ -472,12 +517,12 @@ public class PojoCacheStoreSelfTest extends GridCommonAbstractTest {
                             queueEmpty = true;
                         else {
                             if (rnd.nextBoolean())
-                                store.remove(tx, key);
+                                store.delete(key);
                             else
-                                store.removeAll(tx, Collections.singleton(key));
+                                store.deleteAll(Collections.singleton(key));
 
                             if (tx != null)
-                                store.txEnd(tx, true);
+                                store.txEnd(true);
                         }
                     }
                     else { // Update.
@@ -489,13 +534,20 @@ public class PojoCacheStoreSelfTest extends GridCommonAbstractTest {
                             Organization val =
                                 new Organization(key.getId(), "Name" + key.getId(), "City" + key.getId());
 
+                            Cache.Entry<OrganizationKey, Organization> entry = new CacheEntryImpl<>(key, val);
+
                             if (rnd.nextBoolean())
-                                store.put(tx, key, val);
-                            else
-                                store.putAll(tx, Collections.singletonMap(key, val));
+                                store.write(entry);
+                            else {
+                                Collection<Cache.Entry<?, ?>> col = new ArrayList<>();
+
+                                col.add(entry);
+
+                                store.writeAll(col);
+                            }
 
                             if (tx != null)
-                                store.txEnd(tx, true);
+                                store.txEnd(true);
 
                             queue.add(key);
                         }
@@ -505,13 +557,20 @@ public class PojoCacheStoreSelfTest extends GridCommonAbstractTest {
                         OrganizationKey key = new OrganizationKey(rnd.nextInt());
                         Organization val = new Organization(key.getId(), "Name" + key.getId(), "City" + key.getId());
 
+                        Cache.Entry<OrganizationKey, Organization> entry = new CacheEntryImpl<>(key, val);
+
                         if (rnd.nextBoolean())
-                            store.put(tx, key, val);
-                        else
-                            store.putAll(tx, Collections.singletonMap(key, val));
+                            store.write(entry);
+                        else {
+                            Collection<Cache.Entry<?, ?>> col = new ArrayList<>();
+
+                            col.add(entry);
+
+                            store.writeAll(col);
+                        }
 
                         if (tx != null)
-                            store.txEnd(tx, true);
+                            store.txEnd(true);
 
                         queue.add(key);
                     }


Mime
View raw message