ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [13/42] incubator-ignite git commit: # IGNITE-32: Renaming JDBC store class.
Date Fri, 06 Feb 2015 15:21:17 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f55d1983/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
deleted file mode 100644
index a4392f1..0000000
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
+++ /dev/null
@@ -1,1560 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.jdbc;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.cache.store.jdbc.dialect.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.lifecycle.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.transactions.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.*;
-import javax.cache.integration.*;
-import javax.sql.*;
-import java.sql.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
-
-import static java.sql.Statement.*;
-
-/**
- * Base {@link CacheStore} implementation backed by JDBC. This implementation stores objects in underlying database
- * using mapping description.
- * <p>
- * <h2 class="header">Configuration</h2>
- * Sections below describe mandatory and optional configuration settings as well
- * as providing example using Java and Spring XML.
- * <h3>Mandatory</h3>
- * There are no mandatory configuration parameters.
- * <h3>Optional</h3>
- * <ul>
- *     <li>Data source (see {@link #setDataSource(DataSource)}</li>
- *     <li>Maximum batch size for writeAll and deleteAll operations. (see {@link #setBatchSize(int)})</li>
- *     <li>Max workers thread count. These threads are responsible for load cache. (see {@link #setMaxPoolSize(int)})</li>
- *     <li>Parallel load cache minimum threshold. (see {@link #setParallelLoadCacheMinimumThreshold(int)})</li>
- * </ul>
- * <h2 class="header">Java Example</h2>
- * <pre name="code" class="java">
- *     ...
- *     JdbcPojoCacheStore store = new JdbcPojoCacheStore();
- *     ...
- *
- * </pre>
- * <h2 class="header">Spring Example</h2>
- * <pre name="code" class="xml">
- *     ...
- *     &lt;bean id=&quot;cache.jdbc.store&quot;
- *         class=&quot;org.apache.ignite.cache.store.jdbc.JdbcPojoCacheStore&quot;&gt;
- *         &lt;property name=&quot;connectionUrl&quot; value=&quot;jdbc:h2:mem:&quot;/&gt;
- *     &lt;/bean&gt;
- *     ...
- * </pre>
- * <p>
- * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
- */
-public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements LifecycleAware {
-    /** Max attempt write count. */
-    protected static final int MAX_ATTEMPT_WRITE_COUNT = 2;
-
-    /** Default batch size for put and remove operations. */
-    protected static final int DFLT_BATCH_SIZE = 512;
-
-    /** Default batch size for put and remove operations. */
-    protected static final int DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD = 512;
-
-    /** Connection attribute property name. */
-    protected static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION";
-
-    /** Empty column value. */
-    protected static final Object[] EMPTY_COLUMN_VALUE = new Object[] { null };
-
-    /** Auto-injected logger instance. */
-    @IgniteLoggerResource
-    protected IgniteLogger log;
-
-    /** Lock for metadata cache. */
-    @GridToStringExclude
-    private final Lock cacheMappingsLock = new ReentrantLock();
-
-    /** Data source. */
-    protected DataSource dataSrc;
-
-    /** Cache with entry mapping description. (cache name, (key id, mapping description)). */
-    protected volatile Map<String, Map<Object, EntryMapping>> cacheMappings = Collections.emptyMap();
-
-    /** Database dialect. */
-    protected JdbcDialect dialect;
-
-    /** Max workers thread count. These threads are responsible for load cache. */
-    private int maxPoolSz = Runtime.getRuntime().availableProcessors();
-
-    /** Maximum batch size for writeAll and deleteAll operations. */
-    private int batchSz = DFLT_BATCH_SIZE;
-
-    /** Parallel load cache minimum threshold. If {@code 0} then load sequentially. */
-    private int parallelLoadCacheMinThreshold = DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD;
-
-    /**
-     * Get field value from object.
-     *
-     * @param typeName Type name.
-     * @param fieldName Field name.
-     * @param obj Cache object.
-     * @return Field value from object.
-     */
-    @Nullable protected abstract Object extractField(String typeName, String fieldName, Object obj)
-        throws CacheException;
-
-    /**
-     * Construct object from query result.
-     *
-     * @param <R> Type of result object.
-     * @param typeName Type name.
-     * @param fields Fields descriptors.
-     * @param loadColIdxs Select query columns index.
-     * @param rs ResultSet.
-     * @return Constructed object.
-     */
-    protected abstract <R> R buildObject(String typeName, Collection<CacheTypeFieldMetadata> fields,
-        Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheLoaderException;
-
-    /**
-     * Extract key type id from key object.
-     *
-     * @param key Key object.
-     * @return Key type id.
-     */
-    protected abstract Object keyTypeId(Object key) throws CacheException;
-
-    /**
-     * Extract key type id from key class name.
-     *
-     * @param type String description of key type.
-     * @return Key type id.
-     */
-    protected abstract Object keyTypeId(String type) throws CacheException;
-
-    /**
-     * Prepare internal store specific builders for provided types metadata.
-     *
-     * @param types Collection of types.
-     * @throws CacheException If failed to prepare.
-     */
-    protected abstract void prepareBuilders(@Nullable String cacheName, Collection<CacheTypeMetadata> types)
-        throws CacheException;
-
-    /**
-     * Perform dialect resolution.
-     *
-     * @return The resolved dialect.
-     * @throws CacheException Indicates problems accessing the metadata.
-     */
-    protected JdbcDialect resolveDialect() throws CacheException {
-        Connection conn = null;
-
-        String dbProductName = null;
-
-        try {
-            conn = openConnection(false);
-
-            dbProductName = conn.getMetaData().getDatabaseProductName();
-        }
-        catch (SQLException e) {
-            throw new CacheException("Failed access to metadata for detect database dialect.", e);
-        }
-        finally {
-            U.closeQuiet(conn);
-        }
-
-        if ("H2".equals(dbProductName))
-            return new H2Dialect();
-
-        if ("MySQL".equals(dbProductName))
-            return new MySQLDialect();
-
-        if (dbProductName.startsWith("Microsoft SQL Server"))
-            return new SQLServerDialect();
-
-        if ("Oracle".equals(dbProductName))
-            return new OracleDialect();
-
-        if (dbProductName.startsWith("DB2/"))
-            return new DB2Dialect();
-
-        U.warn(log, "Failed to resolve dialect (BasicJdbcDialect will be used): " + dbProductName);
-
-        return new BasicJdbcDialect();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteException {
-        if (dataSrc == null)
-            throw new IgniteException("Failed to initialize cache store (data source is not provided).");
-
-        if (dialect == null)
-            dialect = resolveDialect();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop() throws IgniteException {
-        // No-op.
-    }
-
-    /**
-     * Gets connection from a pool.
-     *
-     * @param autocommit {@code true} If connection should use autocommit mode.
-     * @return Pooled connection.
-     * @throws SQLException In case of error.
-     */
-    protected Connection openConnection(boolean autocommit) throws SQLException {
-        Connection conn = dataSrc.getConnection();
-
-        conn.setAutoCommit(autocommit);
-
-        return conn;
-    }
-
-    /**
-     * @return Connection.
-     * @throws SQLException In case of error.
-     */
-    protected Connection connection() throws SQLException {
-        CacheStoreSession ses = session();
-
-        if (ses.transaction() != null) {
-            Map<String, Connection> prop = ses.properties();
-
-            Connection conn = prop.get(ATTR_CONN_PROP);
-
-            if (conn == null) {
-                conn = openConnection(false);
-
-                // Store connection in session to used it for other operations in the same session.
-                prop.put(ATTR_CONN_PROP, conn);
-            }
-
-            return conn;
-        }
-        // Transaction can be null in case of simple load operation.
-        else
-            return openConnection(true);
-    }
-
-    /**
-     * Closes connection.
-     *
-     * @param conn Connection to close.
-     */
-    protected void closeConnection(@Nullable Connection conn) {
-        CacheStoreSession ses = session();
-
-        // Close connection right away if there is no transaction.
-        if (ses.transaction() == null)
-            U.closeQuiet(conn);
-    }
-
-    /**
-     * Closes allocated resources depending on transaction status.
-     *
-     * @param conn Allocated connection.
-     * @param st Created statement,
-     */
-    protected void end(@Nullable Connection conn, @Nullable Statement st) {
-        U.closeQuiet(st);
-
-        closeConnection(conn);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void txEnd(boolean commit) throws CacheWriterException {
-        CacheStoreSession ses = session();
-
-        IgniteTx tx = ses.transaction();
-
-        Connection conn = ses.<String, Connection>properties().remove(ATTR_CONN_PROP);
-
-        if (conn != null) {
-            assert tx != null;
-
-            try {
-                if (commit)
-                    conn.commit();
-                else
-                    conn.rollback();
-            }
-            catch (SQLException e) {
-                throw new CacheWriterException(
-                    "Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e);
-            }
-            finally {
-                U.closeQuiet(conn);
-            }
-        }
-
-        if (tx != null && log.isDebugEnabled())
-            log.debug("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']');
-    }
-
-    /**
-     * Retrieves the value of the designated column in the current row of this <code>ResultSet</code> object and
-     * will convert to the requested Java data type.
-     *
-     * @param rs Result set.
-     * @param colIdx Column index in result set.
-     * @param type Class representing the Java data type to convert the designated column to.
-     * @return Value in column.
-     * @throws SQLException If a database access error occurs or this method is called.
-     */
-    protected Object getColumnValue(ResultSet rs, int colIdx, Class<?> type) throws SQLException {
-        if (type == int.class)
-            return rs.getInt(colIdx);
-
-        if (type == long.class)
-            return rs.getLong(colIdx);
-
-        if (type == double.class)
-            return rs.getDouble(colIdx);
-
-        if (type == boolean.class)
-            return rs.getBoolean(colIdx);
-
-        if (type == byte.class)
-            return rs.getByte(colIdx);
-
-        if (type == short.class)
-            return rs.getShort(colIdx);
-
-        if (type == float.class)
-            return rs.getFloat(colIdx);
-
-        if (type == Integer.class || type == Long.class || type == Double.class ||
-            type == Byte.class || type == Short.class ||  type == Float.class) {
-            Object val = rs.getObject(colIdx);
-
-            if (val != null) {
-                Number num = (Number)val;
-
-                if (type == Integer.class)
-                    return num.intValue();
-                else if (type == Long.class)
-                    return num.longValue();
-                else if (type == Double.class)
-                    return num.doubleValue();
-                else if (type == Byte.class)
-                    return num.byteValue();
-                else if (type == Short.class)
-                    return num.shortValue();
-                else if (type == Float.class)
-                    return num.floatValue();
-            }
-            else
-                return EMPTY_COLUMN_VALUE;
-        }
-
-        return rs.getObject(colIdx);
-    }
-
-    /**
-     * Construct load cache from range.
-     *
-     * @param em Type mapping description.
-     * @param clo Closure that will be applied to loaded values.
-     * @param lowerBound Lower bound for range.
-     * @param upperBound Upper bound for range.
-     * @return Callable for pool submit.
-     */
-    private Callable<Void> loadCacheRange(final EntryMapping em, final IgniteBiInClosure<K, V> clo,
-        @Nullable final Object[] lowerBound, @Nullable final Object[] upperBound) {
-        return new Callable<Void>() {
-            @Override public Void call() throws Exception {
-                Connection conn = null;
-
-                PreparedStatement stmt = null;
-
-                try {
-                    conn = openConnection(true);
-
-                    stmt = conn.prepareStatement(lowerBound == null && upperBound == null
-                        ? em.loadCacheQry
-                        : em.loadCacheRangeQuery(lowerBound != null, upperBound != null));
-
-                    int ix = 1;
-
-                    if (lowerBound != null)
-                        for (int i = lowerBound.length; i > 0; i--)
-                            for (int j = 0; j < i; j++)
-                                stmt.setObject(ix++, lowerBound[j]);
-
-                    if (upperBound != null)
-                        for (int i = upperBound.length; i > 0; i--)
-                            for (int j = 0; j < i; j++)
-                                stmt.setObject(ix++, upperBound[j]);
-
-                    ResultSet rs = stmt.executeQuery();
-
-                    while (rs.next()) {
-                        K key = buildObject(em.keyType(), em.keyColumns(), em.loadColIdxs, rs);
-                        V val = buildObject(em.valueType(), em.valueColumns(), em.loadColIdxs, rs);
-
-                        clo.apply(key, val);
-                    }
-                }
-                catch (SQLException e) {
-                    throw new IgniteCheckedException("Failed to load cache", e);
-                }
-                finally {
-                    U.closeQuiet(stmt);
-
-                    U.closeQuiet(conn);
-                }
-
-                return null;
-            }
-        };
-    }
-
-    /**
-     * Construct load cache in one select.
-     *
-     * @param m Type mapping description.
-     * @param clo Closure for loaded values.
-     * @return Callable for pool submit.
-     */
-    private Callable<Void> loadCacheFull(EntryMapping m, IgniteBiInClosure<K, V> clo) {
-        return loadCacheRange(m, clo, null, null);
-    }
-
-    /**
-     * @return Type mappings for specified cache name.
-     * @throws CacheException If failed to initialize.
-     */
-    private Map<Object, EntryMapping> cacheMappings(@Nullable String cacheName) throws CacheException {
-        Map<Object, EntryMapping> entryMappings = cacheMappings.get(cacheName);
-
-        if (entryMappings != null)
-            return entryMappings;
-
-        cacheMappingsLock.lock();
-
-        try {
-            entryMappings = cacheMappings.get(cacheName);
-
-            if (entryMappings != null)
-                return entryMappings;
-
-            Collection<CacheTypeMetadata> types = ignite().cache(session().cacheName()).configuration()
-                .getTypeMetadata();
-
-            entryMappings = U.newHashMap(types.size());
-
-            for (CacheTypeMetadata type : types)
-                entryMappings.put(keyTypeId(type.getKeyType()), new EntryMapping(dialect, type));
-
-            Map<String, Map<Object, EntryMapping>> mappings = new HashMap<>(cacheMappings);
-
-            mappings.put(cacheName, entryMappings);
-
-            prepareBuilders(cacheName, types);
-
-            cacheMappings = mappings;
-
-            return entryMappings;
-        }
-        finally {
-            cacheMappingsLock.unlock();
-        }
-    }
-
-    /**
-     * @param keyTypeId Key type id.
-     * @param key Key object.
-     * @return Entry mapping.
-     * @throws CacheException if mapping for key was not found.
-     */
-    private EntryMapping entryMapping(Object keyTypeId, Object key) throws CacheException {
-        String cacheName = session().cacheName();
-
-        EntryMapping em = cacheMappings(cacheName).get(keyTypeId);
-
-        if (em == null)
-            throw new CacheException("Failed to find mapping description [key=" + key +
-                ", cache=" + (cacheName != null ? cacheName : "<default>") + "]");
-
-        return em;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void loadCache(final IgniteBiInClosure<K, V> clo, @Nullable Object... args)
-        throws CacheLoaderException {
-        try {
-            ExecutorService pool = Executors.newFixedThreadPool(maxPoolSz);
-
-            Collection<Future<?>> futs = new ArrayList<>();
-
-            if (args != null && args.length > 0) {
-                if (args.length % 2 != 0)
-                    throw new CacheLoaderException("Expected even number of arguments, but found: " + args.length);
-
-                if (log.isDebugEnabled())
-                    log.debug("Start loading entries from db using user queries from arguments");
-
-                for (int i = 0; i < args.length; i += 2) {
-                    String keyType = args[i].toString();
-
-                    String selQry = args[i + 1].toString();
-
-                    EntryMapping em = entryMapping(keyTypeId(keyType), keyType);
-
-                    futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, selQry, clo)));
-                }
-            }
-            else {
-                Collection<EntryMapping> entryMappings = cacheMappings(session().cacheName()).values();
-
-                if (log.isDebugEnabled())
-                    log.debug("Start loading all cache types entries from db");
-
-                for (EntryMapping em : entryMappings) {
-                    if (parallelLoadCacheMinThreshold > 0) {
-                        Connection conn = null;
-
-                        try {
-                            conn = connection();
-
-                            PreparedStatement stmt = conn.prepareStatement(em.loadCacheSelRangeQry);
-
-                            stmt.setInt(1, parallelLoadCacheMinThreshold);
-
-                            ResultSet rs = stmt.executeQuery();
-
-                            if (rs.next()) {
-                                int keyCnt = em.keyCols.size();
-
-                                Object[] upperBound = new Object[keyCnt];
-
-                                for (int i = 0; i < keyCnt; i++)
-                                    upperBound[i] = rs.getObject(i + 1);
-
-                                futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound)));
-
-                                while (rs.next()) {
-                                    Object[] lowerBound = upperBound;
-
-                                    upperBound = new Object[keyCnt];
-
-                                    for (int i = 0; i < keyCnt; i++)
-                                        upperBound[i] = rs.getObject(i + 1);
-
-                                    futs.add(pool.submit(loadCacheRange(em, clo, lowerBound, upperBound)));
-                                }
-
-                                futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null)));
-                            }
-                            else
-                                futs.add(pool.submit(loadCacheFull(em, clo)));
-                        }
-                        catch (SQLException ignored) {
-                            futs.add(pool.submit(loadCacheFull(em, clo)));
-                        }
-                        finally {
-                            U.closeQuiet(conn);
-                        }
-                    }
-                    else
-                        futs.add(pool.submit(loadCacheFull(em, clo)));
-                }
-            }
-
-            for (Future<?> fut : futs)
-                U.get(fut);
-        }
-        catch (IgniteCheckedException e) {
-            throw new CacheLoaderException("Failed to load cache", e.getCause());
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public V load(K key) throws CacheLoaderException {
-        assert key != null;
-
-        EntryMapping em = entryMapping(keyTypeId(key), key);
-
-        if (log.isDebugEnabled())
-            log.debug("Start load value from database [table= " + em.fullTableName() + ", key=" + key + "]");
-
-        Connection conn = null;
-
-        PreparedStatement stmt = null;
-
-        try {
-            conn = connection();
-
-            stmt = conn.prepareStatement(em.loadQrySingle);
-
-            fillKeyParameters(stmt, em, key);
-
-            ResultSet rs = stmt.executeQuery();
-
-            if (rs.next())
-                return buildObject(em.valueType(), em.valueColumns(), em.loadColIdxs, rs);
-        }
-        catch (SQLException e) {
-            throw new CacheLoaderException("Failed to load object [table=" + em.fullTableName() +
-                ", key=" + key + "]", e);
-        }
-        finally {
-            end(conn, stmt);
-        }
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException {
-        assert keys != null;
-
-        Connection conn = null;
-
-        try {
-            conn = connection();
-
-            Map<Object, LoadWorker<K, V>> workers = U.newHashMap(cacheMappings(session().cacheName()).size());
-
-            Map<K, V> res = new HashMap<>();
-
-            for (K key : keys) {
-                Object keyTypeId = keyTypeId(key);
-
-                EntryMapping em = entryMapping(keyTypeId, key);
-
-                LoadWorker<K, V> worker = workers.get(keyTypeId);
-
-                if (worker == null)
-                    workers.put(keyTypeId, worker = new LoadWorker<>(conn, em));
-
-                worker.keys.add(key);
-
-                if (worker.keys.size() == em.maxKeysPerStmt)
-                    res.putAll(workers.remove(keyTypeId).call());
-            }
-
-            for (LoadWorker<K, V> worker : workers.values())
-                res.putAll(worker.call());
-
-            return res;
-        }
-        catch (Exception e) {
-            throw new CacheWriterException("Failed to load entries from database", e);
-        }
-        finally {
-            closeConnection(conn);
-        }
-    }
-
-    /**
-     * @param insStmt Insert statement.
-     * @param updStmt Update statement.
-     * @param em Entry mapping.
-     * @param entry Cache entry.
-     */
-    private void writeUpsert(PreparedStatement insStmt, PreparedStatement updStmt,
-        EntryMapping em, Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
-        try {
-            CacheWriterException we = null;
-
-            for (int attempt = 0; attempt < MAX_ATTEMPT_WRITE_COUNT; attempt++) {
-                int paramIdx = fillValueParameters(updStmt, 1, em, entry.getValue());
-
-                fillKeyParameters(updStmt, paramIdx, em, entry.getKey());
-
-                if (updStmt.executeUpdate() == 0) {
-                    paramIdx = fillKeyParameters(insStmt, em, entry.getKey());
-
-                    fillValueParameters(insStmt, paramIdx, em, entry.getValue());
-
-                    try {
-                        insStmt.executeUpdate();
-
-                        if (attempt > 0)
-                            U.warn(log, "Entry was inserted in database on second try [table=" + em.fullTableName() +
-                                ", entry=" + entry + "]");
-                    }
-                    catch (SQLException e) {
-                        String sqlState = e.getSQLState();
-
-                        SQLException nested = e.getNextException();
-
-                        while (sqlState == null && nested != null) {
-                            sqlState = nested.getSQLState();
-
-                            nested = nested.getNextException();
-                        }
-
-                        // The error with code 23505 or 23000 is thrown when trying to insert a row that
-                        // would violate a unique index or primary key.
-                        if ("23505".equals(sqlState) || "23000".equals(sqlState)) {
-                            if (we == null)
-                                we = new CacheWriterException("Failed insert entry in database, violate a unique" +
-                                    " index or primary key [table=" + em.fullTableName() + ", entry=" + entry + "]");
-
-                            we.addSuppressed(e);
-
-                            U.warn(log, "Failed insert entry in database, violate a unique index or primary key" +
-                                " [table=" + em.fullTableName() + ", entry=" + entry + "]");
-
-                            continue;
-                        }
-
-                        throw new CacheWriterException("Failed insert entry in database [table=" + em.fullTableName() +
-                            ", entry=" + entry, e);
-                    }
-                }
-
-                if (attempt > 0)
-                    U.warn(log, "Entry was updated in database on second try [table=" + em.fullTableName() +
-                        ", entry=" + entry + "]");
-
-                return;
-            }
-
-            throw we;
-        }
-        catch (SQLException e) {
-            throw new CacheWriterException("Failed update entry in database [table=" + em.fullTableName() +
-                ", entry=" + entry + "]", e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
-        assert entry != null;
-
-        K key = entry.getKey();
-
-        EntryMapping em = entryMapping(keyTypeId(key), key);
-
-        if (log.isDebugEnabled())
-            log.debug("Start write entry to database [table=" + em.fullTableName() + ", entry=" + entry + "]");
-
-        Connection conn = null;
-
-        try {
-            conn = connection();
-
-            if (dialect.hasMerge()) {
-                PreparedStatement stmt = null;
-
-                try {
-                    stmt = conn.prepareStatement(em.mergeQry);
-
-                    int i = fillKeyParameters(stmt, em, key);
-
-                    fillValueParameters(stmt, i, em, entry.getValue());
-
-                    int updCnt = stmt.executeUpdate();
-
-                    if (updCnt != 1)
-                        U.warn(log, "Unexpected number of updated entries [table=" + em.fullTableName() +
-                            ", entry=" + entry + "expected=1, actual=" + updCnt + "]");
-                }
-                finally {
-                    U.closeQuiet(stmt);
-                }
-            }
-            else {
-                PreparedStatement insStmt = null;
-
-                PreparedStatement updStmt = null;
-
-                try {
-                    insStmt = conn.prepareStatement(em.insQry);
-
-                    updStmt = conn.prepareStatement(em.updQry);
-
-                    writeUpsert(insStmt, updStmt, em, entry);
-                }
-                finally {
-                    U.closeQuiet(insStmt);
-
-                    U.closeQuiet(updStmt);
-                }
-            }
-        }
-        catch (SQLException e) {
-            throw new CacheWriterException("Failed to write entry to database [table=" + em.fullTableName() +
-                ", entry=" + entry + "]", e);
-        }
-        finally {
-            closeConnection(conn);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeAll(final Collection<Cache.Entry<? extends K, ? extends V>> entries)
-        throws CacheWriterException {
-        assert entries != null;
-
-        Connection conn = null;
-
-        try {
-            conn = connection();
-
-            Object currKeyTypeId = null;
-
-            if (dialect.hasMerge()) {
-                PreparedStatement mergeStmt = null;
-
-                try {
-                    EntryMapping em = null;
-
-                    LazyValue<Object[]> lazyEntries = new LazyValue<Object[]>() {
-                        @Override public Object[] create() {
-                            return entries.toArray();
-                        }
-                    };
-
-                    int fromIdx = 0, prepared = 0;
-
-                    for (Cache.Entry<? extends K, ? extends V> entry : entries) {
-                        K key = entry.getKey();
-
-                        Object keyTypeId = keyTypeId(key);
-
-                        em = entryMapping(keyTypeId, key);
-
-                        if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) {
-                            if (mergeStmt != null) {
-                                executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries);
-
-                                U.closeQuiet(mergeStmt);
-                            }
-
-                            mergeStmt = conn.prepareStatement(em.mergeQry);
-
-                            currKeyTypeId = keyTypeId;
-
-                            prepared = 0;
-                        }
-
-                        int i = fillKeyParameters(mergeStmt, em, key);
-
-                        fillValueParameters(mergeStmt, i, em, entry.getValue());
-
-                        mergeStmt.addBatch();
-
-                        if (++prepared % batchSz == 0) {
-                            executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries);
-
-                            prepared = 0;
-                        }
-                    }
-
-                    if (mergeStmt != null && prepared % batchSz != 0)
-                        executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries);
-                }
-                finally {
-                    U.closeQuiet(mergeStmt);
-                }
-            }
-            else {
-                PreparedStatement insStmt = null;
-
-                PreparedStatement updStmt = null;
-
-                try {
-                    for (Cache.Entry<? extends K, ? extends V> entry : entries) {
-                        K key = entry.getKey();
-
-                        Object keyTypeId = keyTypeId(key);
-
-                        EntryMapping em = entryMapping(keyTypeId, key);
-
-                        if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) {
-                            U.closeQuiet(insStmt);
-
-                            insStmt = conn.prepareStatement(em.insQry);
-
-                            U.closeQuiet(updStmt);
-
-                            updStmt = conn.prepareStatement(em.updQry);
-
-                            currKeyTypeId = keyTypeId;
-                        }
-
-                        writeUpsert(insStmt, updStmt, em, entry);
-                    }
-                }
-                finally {
-                    U.closeQuiet(insStmt);
-
-                    U.closeQuiet(updStmt);
-                }
-            }
-        }
-        catch (SQLException e) {
-            throw new CacheWriterException("Failed to write entries in database", e);
-        }
-        finally {
-            closeConnection(conn);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void delete(Object key) throws CacheWriterException {
-        assert key != null;
-
-        EntryMapping em = entryMapping(keyTypeId(key), key);
-
-        if (log.isDebugEnabled())
-            log.debug("Start remove value from database [table=" + em.fullTableName() + ", key=" + key + "]");
-
-        Connection conn = null;
-
-        PreparedStatement stmt = null;
-
-        try {
-            conn = connection();
-
-            stmt = conn.prepareStatement(em.remQry);
-
-            fillKeyParameters(stmt, em, key);
-
-            int delCnt = stmt.executeUpdate();
-
-            if (delCnt != 1)
-                U.warn(log, "Unexpected number of deleted entries [table=" + em.fullTableName() + ", key=" + key +
-                    "expected=1, actual=" + delCnt + "]");
-        }
-        catch (SQLException e) {
-            throw new CacheWriterException("Failed to remove value from database [table=" + em.fullTableName() +
-                ", key=" + key + "]", e);
-        }
-        finally {
-            end(conn, stmt);
-        }
-    }
-
-    /**
-     * @param em Entry mapping.
-     * @param stmt Statement.
-     * @param desc Statement description for error message.
-     * @param fromIdx Objects in batch start from index.
-     * @param prepared Expected objects in batch.
-     * @param lazyObjs All objects used in batch statement as array.
-     */
-    private void executeBatch(EntryMapping em, Statement stmt, String desc, int fromIdx, int prepared,
-        LazyValue<Object[]> lazyObjs) throws SQLException {
-        try {
-            int[] rowCounts = stmt.executeBatch();
-
-            int numOfRowCnt = rowCounts.length;
-
-            if (numOfRowCnt != prepared)
-                U.warn(log, "Unexpected number of updated rows [table=" + em.fullTableName() + ", expected=" + prepared +
-                    ", actual=" + numOfRowCnt + "]");
-
-            for (int i = 0; i < numOfRowCnt; i++) {
-                int cnt = rowCounts[i];
-
-                if (cnt != 1 && cnt != SUCCESS_NO_INFO) {
-                    Object[] objs = lazyObjs.value();
-
-                    U.warn(log, "Batch " + desc + " returned unexpected updated row count [table=" + em.fullTableName() +
-                        ", entry=" + objs[fromIdx + i] + ", expected=1, actual=" + cnt + "]");
-                }
-            }
-        }
-        catch (BatchUpdateException be) {
-            int[] rowCounts = be.getUpdateCounts();
-
-            for (int i = 0; i < rowCounts.length; i++) {
-                if (rowCounts[i] == EXECUTE_FAILED) {
-                    Object[] objs = lazyObjs.value();
-
-                    U.warn(log, "Batch " + desc + " failed on execution [table=" + em.fullTableName() +
-                        ", entry=" + objs[fromIdx + i] + "]");
-                }
-            }
-
-            throw be;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void deleteAll(final Collection<?> keys) throws CacheWriterException {
-        assert keys != null;
-
-        Connection conn = null;
-
-        try {
-            conn = connection();
-
-            Object currKeyTypeId = null;
-
-            EntryMapping em = null;
-
-            PreparedStatement delStmt = null;
-
-            LazyValue<Object[]> lazyKeys = new LazyValue<Object[]>() {
-                @Override public Object[] create() {
-                    return keys.toArray();
-                }
-            };
-
-            int fromIdx = 0, prepared = 0;
-
-            for (Object key : keys) {
-                Object keyTypeId = keyTypeId(key);
-
-                em = entryMapping(keyTypeId, key);
-
-                if (delStmt == null) {
-                    delStmt = conn.prepareStatement(em.remQry);
-
-                    currKeyTypeId = keyTypeId;
-                }
-
-                if (!currKeyTypeId.equals(keyTypeId)) {
-                    executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys);
-
-                    fromIdx += prepared;
-
-                    prepared = 0;
-
-                    currKeyTypeId = keyTypeId;
-                }
-
-                fillKeyParameters(delStmt, em, key);
-
-                delStmt.addBatch();
-
-                if (++prepared % batchSz == 0) {
-                    executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys);
-
-                    fromIdx += prepared;
-
-                    prepared = 0;
-                }
-            }
-
-            if (delStmt != null && prepared % batchSz != 0)
-                executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys);
-        }
-        catch (SQLException e) {
-            throw new CacheWriterException("Failed to remove values from database", e);
-        }
-        finally {
-            closeConnection(conn);
-        }
-    }
-
-    /**
-     * @param stmt Prepare statement.
-     * @param i Start index for parameters.
-     * @param em Entry mapping.
-     * @param key Key object.
-     * @return Next index for parameters.
-     */
-    protected int fillKeyParameters(PreparedStatement stmt, int i, EntryMapping em,
-        Object key) throws CacheException {
-        for (CacheTypeFieldMetadata field : em.keyColumns()) {
-            Object fieldVal = extractField(em.keyType(), field.getJavaName(), key);
-
-            try {
-                if (fieldVal != null)
-                    stmt.setObject(i++, fieldVal);
-                else
-                    stmt.setNull(i++, field.getDatabaseType());
-            }
-            catch (SQLException e) {
-                throw new CacheException("Failed to set statement parameter name: " + field.getDatabaseName(), e);
-            }
-        }
-
-        return i;
-    }
-
-    /**
-     * @param stmt Prepare statement.
-     * @param m Type mapping description.
-     * @param key Key object.
-     * @return Next index for parameters.
-     */
-    protected int fillKeyParameters(PreparedStatement stmt, EntryMapping m, Object key) throws CacheException {
-        return fillKeyParameters(stmt, 1, m, key);
-    }
-
-    /**
-     * @param stmt Prepare statement.
-     * @param i Start index for parameters.
-     * @param m Type mapping description.
-     * @param val Value object.
-     * @return Next index for parameters.
-     */
-    protected int fillValueParameters(PreparedStatement stmt, int i, EntryMapping m, Object val)
-        throws CacheWriterException {
-        for (CacheTypeFieldMetadata field : m.uniqValFields) {
-            Object fieldVal = extractField(m.valueType(), field.getJavaName(), val);
-
-            try {
-                if (fieldVal != null)
-                    stmt.setObject(i++, fieldVal);
-                else
-                    stmt.setNull(i++, field.getDatabaseType());
-            }
-            catch (SQLException e) {
-                throw new CacheWriterException("Failed to set statement parameter name: " + field.getDatabaseName(), e);
-            }
-        }
-
-        return i;
-    }
-
-    /**
-     * @return Data source.
-     */
-    public DataSource getDataSource() {
-        return dataSrc;
-    }
-
-    /**
-     * @param dataSrc Data source.
-     */
-    public void setDataSource(DataSource dataSrc) {
-        this.dataSrc = dataSrc;
-    }
-
-    /**
-     * Get database dialect.
-     *
-     * @return Database dialect.
-     */
-    public JdbcDialect getDialect() {
-        return dialect;
-    }
-
-    /**
-     * Set database dialect.
-     *
-     * @param dialect Database dialect.
-     */
-    public void setDialect(JdbcDialect dialect) {
-        this.dialect = dialect;
-    }
-
-    /**
-     * Get Max workers thread count. These threads are responsible for execute query.
-     *
-     * @return Max workers thread count.
-     */
-    public int getMaxPoolSize() {
-        return maxPoolSz;
-    }
-
-    /**
-     * Set Max workers thread count. These threads are responsible for execute query.
-     *
-     * @param maxPoolSz Max workers thread count.
-     */
-    public void setMaxPoolSize(int maxPoolSz) {
-        this.maxPoolSz = maxPoolSz;
-    }
-
-    /**
-     * Get maximum batch size for delete and delete operations.
-     *
-     * @return Maximum batch size.
-     */
-    public int getBatchSize() {
-        return batchSz;
-    }
-
-    /**
-     * Set maximum batch size for write and delete operations.
-     *
-     * @param batchSz Maximum batch size.
-     */
-    public void setBatchSize(int batchSz) {
-        this.batchSz = batchSz;
-    }
-
-    /**
-     * Parallel load cache minimum row count threshold.
-     *
-     * @return If {@code 0} then load sequentially.
-     */
-    public int getParallelLoadCacheMinimumThreshold() {
-        return parallelLoadCacheMinThreshold;
-    }
-
-    /**
-     * Parallel load cache minimum row count threshold.
-     *
-     * @param parallelLoadCacheMinThreshold Minimum row count threshold. If {@code 0} then load sequentially.
-     */
-    public void setParallelLoadCacheMinimumThreshold(int parallelLoadCacheMinThreshold) {
-        this.parallelLoadCacheMinThreshold = parallelLoadCacheMinThreshold;
-    }
-
-    /**
-     * Entry mapping description.
-     */
-    protected static class EntryMapping {
-        /** Database dialect. */
-        private final JdbcDialect dialect;
-
-        /** Select border for range queries. */
-        protected final String loadCacheSelRangeQry;
-
-        /** Select all items query. */
-        protected final String loadCacheQry;
-
-        /** Select item query. */
-        protected final String loadQrySingle;
-
-        /** Select items query. */
-        private final String loadQry;
-
-        /** Merge item(s) query. */
-        protected final String mergeQry;
-
-        /** Update item query. */
-        protected final String insQry;
-
-        /** Update item query. */
-        protected final String updQry;
-
-        /** Remove item(s) query. */
-        protected final String remQry;
-
-        /** Max key count for load query per statement. */
-        protected final int maxKeysPerStmt;
-
-        /** Database key columns. */
-        private final Collection<String> keyCols;
-
-        /** Database unique value columns. */
-        private final Collection<String> cols;
-
-        /** Select query columns index. */
-        private final Map<String, Integer> loadColIdxs;
-
-        /** Unique value fields. */
-        private final Collection<CacheTypeFieldMetadata> uniqValFields;
-
-        /** Type metadata. */
-        private final CacheTypeMetadata typeMeta;
-
-        /** Full table name. */
-        private final String fullTblName;
-
-        /**
-         * @param typeMeta Type metadata.
-         */
-        public EntryMapping(JdbcDialect dialect, CacheTypeMetadata typeMeta) {
-            this.dialect = dialect;
-
-            this.typeMeta = typeMeta;
-
-            final Collection<CacheTypeFieldMetadata> keyFields = typeMeta.getKeyFields();
-
-            Collection<CacheTypeFieldMetadata> valFields = typeMeta.getValueFields();
-
-            uniqValFields = F.view(valFields, new IgnitePredicate<CacheTypeFieldMetadata>() {
-                @Override public boolean apply(CacheTypeFieldMetadata col) {
-                    return !keyFields.contains(col);
-                }
-            });
-
-            String schema = typeMeta.getDatabaseSchema();
-
-            String tblName = typeMeta.getDatabaseTable();
-
-            fullTblName = F.isEmpty(schema) ? tblName : schema + "." + tblName;
-
-            keyCols = databaseColumns(keyFields);
-
-            Collection<String> uniqValCols = databaseColumns(uniqValFields);
-
-            cols = F.concat(false, keyCols, uniqValCols);
-
-            loadColIdxs = U.newHashMap(cols.size());
-
-            int idx = 1;
-
-            for (String col : cols)
-                loadColIdxs.put(col, idx++);
-
-            loadCacheQry = dialect.loadCacheQuery(fullTblName, cols);
-
-            loadCacheSelRangeQry = dialect.loadCacheSelectRangeQuery(fullTblName, keyCols);
-
-            loadQrySingle = dialect.loadQuery(fullTblName, keyCols, cols, 1);
-
-            maxKeysPerStmt = dialect.getMaxParamsCnt() / keyCols.size();
-
-            loadQry = dialect.loadQuery(fullTblName, keyCols, cols, maxKeysPerStmt);
-
-            insQry = dialect.insertQuery(fullTblName, keyCols, uniqValCols);
-
-            updQry = dialect.updateQuery(fullTblName, keyCols, uniqValCols);
-
-            mergeQry = dialect.mergeQuery(fullTblName, keyCols, uniqValCols);
-
-            remQry = dialect.removeQuery(fullTblName, keyCols);
-        }
-
-        /**
-         * Extract database column names from {@link CacheTypeFieldMetadata}.
-         *
-         * @param dsc collection of {@link CacheTypeFieldMetadata}.
-         */
-        private static Collection<String> databaseColumns(Collection<CacheTypeFieldMetadata> dsc) {
-            return F.transform(dsc, new C1<CacheTypeFieldMetadata, String>() {
-                /** {@inheritDoc} */
-                @Override public String apply(CacheTypeFieldMetadata col) {
-                    return col.getDatabaseName();
-                }
-            });
-        }
-
-        /**
-         * Construct query for select values with key count less or equal {@code maxKeysPerStmt}
-         *
-         * @param keyCnt Key count.
-         */
-        protected String loadQuery(int keyCnt) {
-            assert keyCnt <= maxKeysPerStmt;
-
-            if (keyCnt == maxKeysPerStmt)
-                return loadQry;
-
-            if (keyCnt == 1)
-                return loadQrySingle;
-
-            return dialect.loadQuery(fullTblName, keyCols, cols, keyCnt);
-        }
-
-        /**
-         * Construct query for select values in range.
-         *
-         * @param appendLowerBound Need add lower bound for range.
-         * @param appendUpperBound Need add upper bound for range.
-         * @return Query with range.
-         */
-        protected String loadCacheRangeQuery(boolean appendLowerBound, boolean appendUpperBound) {
-            return dialect.loadCacheRangeQuery(fullTblName, keyCols, cols, appendLowerBound, appendUpperBound);
-        }
-
-        /** Key type. */
-        protected String keyType() {
-            return typeMeta.getKeyType();
-        }
-
-        /** Value type. */
-        protected String valueType() {
-            return typeMeta.getValueType();
-        }
-
-        /**
-         * Gets key columns.
-         *
-         * @return Key columns.
-         */
-        protected Collection<CacheTypeFieldMetadata> keyColumns() {
-            return typeMeta.getKeyFields();
-        }
-
-        /**
-         * Gets value columns.
-         *
-         * @return Value columns.
-         */
-        protected Collection<CacheTypeFieldMetadata> valueColumns() {
-            return typeMeta.getValueFields();
-        }
-
-        /**
-         * Get full table name.
-         *
-         * @return &lt;schema&gt;.&lt;table name&gt
-         */
-        protected String fullTableName() {
-            return fullTblName;
-        }
-    }
-
-    /**
-     * Worker for load cache using custom user query.
-     *
-     * @param <K1> Key type.
-     * @param <V1> Value type.
-     */
-    private class LoadCacheCustomQueryWorker<K1, V1> implements Callable<Void> {
-        /** Entry mapping description. */
-        private final EntryMapping em;
-
-        /** User query. */
-        private final String qry;
-
-        /** Closure for loaded values. */
-        private final IgniteBiInClosure<K1, V1> clo;
-
-        /**
-         * @param em Entry mapping description.
-         * @param qry User query.
-         * @param clo Closure for loaded values.
-         */
-        private LoadCacheCustomQueryWorker(EntryMapping em, String qry, IgniteBiInClosure<K1, V1> clo) {
-            this.em = em;
-            this.qry = qry;
-            this.clo = clo;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Void call() throws Exception {
-            Connection conn = null;
-
-            PreparedStatement stmt = null;
-
-            try {
-                conn = openConnection(true);
-
-                stmt = conn.prepareStatement(qry);
-
-                ResultSet rs = stmt.executeQuery();
-
-                while (rs.next()) {
-                    K1 key = buildObject(em.keyType(), em.keyColumns(), em.loadColIdxs, rs);
-                    V1 val = buildObject(em.valueType(), em.valueColumns(), em.loadColIdxs, rs);
-
-                    clo.apply(key, val);
-                }
-
-                return null;
-            }
-            catch (SQLException e) {
-                throw new CacheLoaderException("Failed to execute custom query for load cache", e);
-            }
-            finally {
-                U.closeQuiet(stmt);
-
-                U.closeQuiet(conn);
-            }
-        }
-    }
-
-    /**
-     * Lazy initialization of value.
-     *
-     * @param <T> Cached object type
-     */
-    private abstract static class LazyValue<T> {
-        /** Cached value. */
-        private T val;
-
-        /**
-         * @return Construct value.
-         */
-        protected abstract T create();
-
-        /**
-         * @return Value.
-         */
-        public T value() {
-            if (val == null)
-                val = create();
-
-            return val;
-        }
-    }
-
-    /**
-     * Worker for load by keys.
-     *
-     * @param <K1> Key type.
-     * @param <V1> Value type.
-     */
-    private class LoadWorker<K1, V1> implements Callable<Map<K1, V1>> {
-        /** Connection. */
-        private final Connection conn;
-
-        /** Keys for load. */
-        private final Collection<K1> keys;
-
-        /** Entry mapping description. */
-        private final EntryMapping em;
-
-        /**
-         * @param conn Connection.
-         * @param em Entry mapping description.
-         */
-        private LoadWorker(Connection conn, EntryMapping em) {
-            this.conn = conn;
-            this.em = em;
-
-            keys = new ArrayList<>(em.maxKeysPerStmt);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Map<K1, V1> call() throws Exception {
-            PreparedStatement stmt = null;
-
-            try {
-                stmt = conn.prepareStatement(em.loadQuery(keys.size()));
-
-                int i = 1;
-
-                for (Object key : keys)
-                    for (CacheTypeFieldMetadata field : em.keyColumns()) {
-                        Object fieldVal = extractField(em.keyType(), field.getJavaName(), key);
-
-                        if (fieldVal != null)
-                            stmt.setObject(i++, fieldVal);
-                        else
-                            stmt.setNull(i++, field.getDatabaseType());
-                    }
-
-                ResultSet rs = stmt.executeQuery();
-
-                Map<K1, V1> entries = U.newHashMap(keys.size());
-
-                while (rs.next()) {
-                    K1 key = buildObject(em.keyType(), em.keyColumns(), em.loadColIdxs, rs);
-                    V1 val = buildObject(em.valueType(), em.valueColumns(), em.loadColIdxs, rs);
-
-                    entries.put(key, val);
-                }
-
-                return entries;
-            }
-            finally {
-                U.closeQuiet(stmt);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f55d1983/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
deleted file mode 100644
index e80fa62..0000000
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcPojoCacheStore.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.jdbc;
-
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.*;
-import javax.cache.integration.*;
-import java.lang.reflect.*;
-import java.sql.*;
-import java.util.*;
-
-/**
- * Base class for {@link CacheStore} that implementation backed by JDBC and POJO via reflection.
- *
- * This implementation stores objects in underlying database using java beans mapping description via reflection.
- */
-public class JdbcPojoCacheStore extends JdbcCacheStore<Object, Object> {
-    /**
-     * POJO methods cache.
-     */
-    protected static class PojoMethodsCache {
-        /** POJO class. */
-        protected final Class<?> cls;
-
-        /** Constructor for POJO object. */
-        private final Constructor ctor;
-
-        /** Cached setters for POJO object. */
-        private final Map<String, Method> getters;
-
-        /** Cached getters for POJO object. */
-        private final Map<String, Method> setters;
-
-        /**
-         * POJO methods cache.
-         *
-         * @param clsName Class name.
-         * @param fields Fields.
-         */
-        public PojoMethodsCache(String clsName, Collection<CacheTypeFieldMetadata> fields) throws CacheException {
-            try {
-                cls = Class.forName(clsName);
-
-                ctor = cls.getDeclaredConstructor();
-
-                if (!ctor.isAccessible())
-                    ctor.setAccessible(true);
-            }
-            catch (ClassNotFoundException e) {
-                throw new CacheException("Failed to find class: " + clsName, e);
-            }
-            catch (NoSuchMethodException e) {
-                throw new CacheException("Failed to find empty constructor for class: " + clsName, e);
-            }
-
-            setters = U.newHashMap(fields.size());
-
-            getters = U.newHashMap(fields.size());
-
-            for (CacheTypeFieldMetadata field : fields) {
-                String prop = capitalFirst(field.getJavaName());
-
-                try {
-                    getters.put(field.getJavaName(), cls.getMethod("get" + prop));
-                }
-                catch (NoSuchMethodException ignored) {
-                    try {
-                        getters.put(field.getJavaName(), cls.getMethod("is" + prop));
-                    }
-                    catch (NoSuchMethodException e) {
-                        throw new CacheException("Failed to find getter for property " + field.getJavaName() +
-                            " of class: " + cls.getName(), e);
-                    }
-                }
-
-                try {
-                    setters.put(field.getJavaName(), cls.getMethod("set" + prop, field.getJavaType()));
-                }
-                catch (NoSuchMethodException e) {
-                    throw new CacheException("Failed to find setter for property " + field.getJavaName() +
-                        " of class: " + clsName, e);
-                }
-            }
-        }
-
-        /**
-         * Capitalizes the first character of the given string.
-         *
-         * @param str String.
-         * @return String with capitalized first character.
-         */
-        @Nullable private String capitalFirst(@Nullable String str) {
-            return str == null ? null :
-                str.isEmpty() ? "" : Character.toUpperCase(str.charAt(0)) + str.substring(1);
-        }
-
-        /**
-         * Construct new instance of pojo object.
-         *
-         * @return pojo object.
-         * @throws CacheLoaderException If construct new instance failed.
-         */
-        protected Object newInstance() throws CacheLoaderException {
-            try {
-                return ctor.newInstance();
-            }
-            catch (Exception e) {
-                throw new CacheLoaderException("Failed to create new instance for class: " + cls, e);
-            }
-        }
-    }
-
-    /** Methods cache. */
-    protected volatile Map<String, Map<String, PojoMethodsCache>> mtdsCache = Collections.emptyMap();
-
-    /** {@inheritDoc} */
-    @Override protected void prepareBuilders(@Nullable String cacheName, Collection<CacheTypeMetadata> types)
-        throws CacheException {
-        Map<String, PojoMethodsCache> typeMethods = U.newHashMap(types.size() * 2);
-
-        for (CacheTypeMetadata type : types) {
-            String keyType = type.getKeyType();
-            typeMethods.put(keyType, new PojoMethodsCache(keyType, type.getKeyFields()));
-
-            String valType = type.getValueType();
-            typeMethods.put(valType, new PojoMethodsCache(valType, type.getValueFields()));
-        }
-
-        Map<String, Map<String, PojoMethodsCache>> newMtdsCache = new HashMap<>(mtdsCache);
-
-        newMtdsCache.put(cacheName, typeMethods);
-
-        mtdsCache = newMtdsCache;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected <R> R buildObject(String typeName, Collection<CacheTypeFieldMetadata> fields,
-        Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheLoaderException {
-        PojoMethodsCache mc = mtdsCache.get(session().cacheName()).get(typeName);
-
-        Object obj = mc.newInstance();
-
-        try {
-            for (CacheTypeFieldMetadata field : fields) {
-                Method setter = mc.setters.get(field.getJavaName());
-
-                Integer colIdx = loadColIdxs.get(field.getDatabaseName());
-
-                setter.invoke(obj, getColumnValue(rs, colIdx, field.getJavaType()));
-            }
-
-            return (R)obj;
-        }
-        catch (Exception e) {
-            throw new CacheLoaderException("Failed to read object of class: " + typeName, e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override protected Object extractField(String typeName, String fieldName, Object obj)
-        throws CacheException {
-        try {
-            PojoMethodsCache mc = mtdsCache.get(session().cacheName()).get(typeName);
-
-            return mc.getters.get(fieldName).invoke(obj);
-        }
-        catch (Exception e) {
-            throw new CacheException("Failed to read object of class: " + typeName, e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Object keyTypeId(Object key) throws CacheException {
-        return key.getClass();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Object keyTypeId(String type) throws CacheException {
-        try {
-            return Class.forName(type);
-        }
-        catch (ClassNotFoundException e) {
-            throw new CacheException("Failed to find class: " + type, e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f55d1983/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/AbstractJdbcCacheStoreMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/AbstractJdbcCacheStoreMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/AbstractJdbcCacheStoreMultithreadedSelfTest.java
deleted file mode 100644
index c263ceb..0000000
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/AbstractJdbcCacheStoreMultithreadedSelfTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.jdbc;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.store.jdbc.model.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-import org.springframework.beans.*;
-import org.springframework.beans.factory.xml.*;
-import org.springframework.context.support.*;
-import org.springframework.core.io.*;
-
-import javax.cache.configuration.*;
-import java.io.*;
-import java.net.*;
-import java.sql.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-
-/**
- *
- */
-public abstract class AbstractJdbcCacheStoreMultithreadedSelfTest<T extends JdbcCacheStore> extends GridCommonAbstractTest {
-    /** Default connection URL (value is <tt>jdbc:h2:mem:jdbcCacheStore;DB_CLOSE_DELAY=-1</tt>). */
-    protected static final String DFLT_CONN_URL = "jdbc:h2:mem:autoCacheStore;DB_CLOSE_DELAY=-1";
-
-    /** IP finder. */
-    protected static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** Number of transactions. */
-    private static final int TX_CNT = 1000;
-
-    /** Number of transactions. */
-    private static final int BATCH_CNT = 2000;
-
-    /** Cache store. */
-    protected T store;
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        store = store();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        Class.forName("org.h2.Driver");
-        Connection conn = DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
-
-        Statement stmt = conn.createStatement();
-
-        stmt.executeUpdate("DROP TABLE IF EXISTS Organization");
-        stmt.executeUpdate("DROP TABLE IF EXISTS Person");
-
-        stmt.executeUpdate("CREATE TABLE Organization (id integer PRIMARY KEY, name varchar(50), city varchar(50))");
-        stmt.executeUpdate("CREATE TABLE Person (id integer PRIMARY KEY, org_id integer, name varchar(50))");
-
-        stmt.executeUpdate("CREATE INDEX Org_Name_IDX On Organization (name)");
-        stmt.executeUpdate("CREATE INDEX Org_Name_City_IDX On Organization (name, city)");
-        stmt.executeUpdate("CREATE INDEX Person_Name_IDX1 On Person (name)");
-        stmt.executeUpdate("CREATE INDEX Person_Name_IDX2 On Person (name desc)");
-
-        conn.commit();
-
-        U.closeQuiet(stmt);
-
-        U.closeQuiet(conn);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-    }
-
-    /**
-     * @return New store.
-     * @throws Exception In case of error.
-     */
-    protected abstract T store() throws Exception;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration c = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
-        disco.setIpFinder(IP_FINDER);
-
-        c.setDiscoverySpi(disco);
-
-        CacheConfiguration cc = defaultCacheConfiguration();
-
-        cc.setCacheMode(PARTITIONED);
-        cc.setAtomicityMode(ATOMIC);
-        cc.setSwapEnabled(false);
-        cc.setWriteBehindEnabled(false);
-
-        UrlResource metaUrl;
-
-        try {
-            metaUrl = new UrlResource(new File("modules/core/src/test/config/store/jdbc/Ignite.xml").toURI().toURL());
-        }
-        catch (MalformedURLException e) {
-            throw new IgniteCheckedException("Failed to resolve metadata path [err=" + e.getMessage() + ']', e);
-        }
-
-        try {
-            GenericApplicationContext springCtx = new GenericApplicationContext();
-
-            new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(metaUrl);
-
-            springCtx.refresh();
-
-            Collection<CacheTypeMetadata> tp = springCtx.getBeansOfType(CacheTypeMetadata.class).values();
-
-            cc.setTypeMetadata(tp);
-        }
-        catch (BeansException e) {
-            if (X.hasCause(e, ClassNotFoundException.class))
-                throw new IgniteCheckedException("Failed to instantiate Spring XML application context " +
-                    "(make sure all classes used in Spring configuration are present at CLASSPATH) " +
-                    "[springUrl=" + metaUrl + ']', e);
-            else
-                throw new IgniteCheckedException("Failed to instantiate Spring XML application context [springUrl=" +
-                    metaUrl + ", err=" + e.getMessage() + ']', e);
-        }
-
-        cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
-        cc.setReadThrough(true);
-        cc.setWriteThrough(true);
-        cc.setLoadPreviousValue(true);
-
-        c.setCacheConfiguration(cc);
-
-        return c;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testMultithreadedPutAll() throws Exception {
-        startGrid();
-
-        multithreaded(new Callable<Object>() {
-            private final Random rnd = new Random();
-
-            @Nullable @Override public Object call() throws Exception {
-                for (int i = 0; i < TX_CNT; i++) {
-                    int cnt = rnd.nextInt(BATCH_CNT);
-
-                    Map<Object, Object> map = U.newHashMap(cnt);
-
-                    for (int j = 0; j < cnt; j++) {
-                        int id = rnd.nextInt();
-
-                        if (rnd.nextBoolean())
-                            map.put(new OrganizationKey(id), new Organization(id, "Name" + id, "City" + id));
-                        else
-                            map.put(new PersonKey(id), new Person(id, rnd.nextInt(), "Name" + id));
-                    }
-
-                    GridCache<Object, Object> cache = cache();
-
-                    cache.putAll(map);
-                }
-
-                return null;
-            }
-        }, 8, "putAll");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f55d1983/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreMultitreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreMultitreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreMultitreadedSelfTest.java
new file mode 100644
index 0000000..c9462c8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreMultitreadedSelfTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import org.h2.jdbcx.*;
+
+/**
+ * Test for JDBC POJO store from multiple threads.
+ */
+public class CacheJdbcPojoStoreMultitreadedSelfTest
+    extends CacheJdbcStoreAbstractMultithreadedSelfTest<CacheJdbcPojoStore> {
+    /** {@inheritDoc} */
+    @Override protected CacheJdbcPojoStore store() throws Exception {
+        CacheJdbcPojoStore store = new CacheJdbcPojoStore();
+
+        store.setDataSource(JdbcConnectionPool.create(DFLT_CONN_URL, "sa", ""));
+
+        return store;
+    }
+}


Mime
View raw message