ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [23/34] incubator-ignite git commit: IGNITE-891 - Cache store improvements
Date Tue, 02 Jun 2015 03:51:17 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java
b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java
new file mode 100644
index 0000000..81736cd
--- /dev/null
+++ b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListener.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.spring;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.transactions.*;
+import org.springframework.jdbc.core.*;
+import org.springframework.jdbc.datasource.*;
+import org.springframework.transaction.*;
+import org.springframework.transaction.support.*;
+
+import javax.cache.integration.*;
+import javax.sql.*;
+
+/**
+ * Cache store session listener based on Spring transaction management.
+ * <p>
+ * This listener starts a new DB transaction for each session and commits
+ * or rolls it back when session ends. If there is no ongoing
+ * cache transaction, this listener is no-op.
+ * <p>
+ * Store implementation can use any Spring APIs like {@link JdbcTemplate}
+ * and others. The listener will guarantee that if there is an
+ * ongoing cache transaction, all store operations within this
+ * transaction will be automatically enlisted in the same database
+ * transaction.
+ * <p>
+ * {@link CacheSpringStoreSessionListener} requires that either
+ * {@link #setTransactionManager(PlatformTransactionManager) transaction manager}
+ * or {@link #setDataSource(DataSource) data source} is configured. If non of them is
+ * provided, exception is thrown. Is both are provided, data source will be
+ * ignored.
+ * <p>
+ * If there is a transaction, a {@link TransactionStatus} object will be saved
+ * as a store session {@link CacheStoreSession#attachment() attachment}. It
+ * can be used to acquire current DB transaction status.
+ */
+public class CacheSpringStoreSessionListener implements CacheStoreSessionListener, LifecycleAware
{
+    /** Transaction manager. */
+    private PlatformTransactionManager txMgr;
+
+    /** Data source. */
+    private DataSource dataSrc;
+
+    /** Propagation behavior. */
+    private int propagation = TransactionDefinition.PROPAGATION_REQUIRED;
+
+    /** Logger. */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /**
+     * Sets transaction manager.
+     * <p>
+     * Either transaction manager or data source is required.
+     * If none is provided, exception will be thrown on startup.
+     *
+     * @param txMgr Transaction manager.
+     */
+    public void setTransactionManager(PlatformTransactionManager txMgr) {
+        this.txMgr = txMgr;
+    }
+
+    /**
+     * Gets transaction manager.
+     *
+     * @return Transaction manager.
+     */
+    public PlatformTransactionManager getTransactionManager() {
+        return txMgr;
+    }
+
+    /**
+     * Sets data source.
+     * <p>
+     * Either transaction manager or data source is required.
+     * If none is provided, exception will be thrown on startup.
+     *
+     * @param dataSrc Data source.
+     */
+    public void setDataSource(DataSource dataSrc) {
+        this.dataSrc = dataSrc;
+    }
+
+    /**
+     * Gets data source.
+     *
+     * @return Data source.
+     */
+    public DataSource getDataSource() {
+        return dataSrc;
+    }
+
+    /**
+     * Sets propagation behavior.
+     * <p>
+     * This parameter is optional.
+     *
+     * @param propagation Propagation behavior.
+     */
+    public void setPropagationBehavior(int propagation) {
+        this.propagation = propagation;
+    }
+
+    /**
+     * Gets propagation behavior.
+     *
+     * @return Propagation behavior.
+     */
+    public int getPropagationBehavior() {
+        return propagation;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        if (txMgr == null && dataSrc == null)
+            throw new IgniteException("Either transaction manager or data source is required
by " +
+                getClass().getSimpleName() + '.');
+
+        if (dataSrc != null) {
+            if (txMgr == null)
+                txMgr = new DataSourceTransactionManager(dataSrc);
+            else
+                U.warn(log, "Data source configured in " + getClass().getSimpleName() +
+                    " will be ignored (transaction manager is already set).");
+        }
+
+        assert txMgr != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionStart(CacheStoreSession ses) {
+        if (ses.isWithinTransaction()) {
+            try {
+                TransactionDefinition def = definition(ses.transaction(), ses.cacheName());
+
+                ses.attach(txMgr.getTransaction(def));
+            }
+            catch (TransactionException e) {
+                throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction()
+ ']', e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+        if (ses.isWithinTransaction()) {
+            TransactionStatus tx = ses.attachment();
+
+            if (tx != null) {
+                ses.attach(null);
+
+                try {
+                    if (commit)
+                        txMgr.commit(tx);
+                    else
+                        txMgr.rollback(tx);
+                }
+                catch (TransactionException e) {
+                    throw new CacheWriterException("Failed to end store session [tx=" + ses.transaction()
+ ']', e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Gets DB transaction isolation level based on ongoing cache transaction isolation.
+     *
+     * @return DB transaction isolation.
+     */
+    private TransactionDefinition definition(Transaction tx, String cacheName) {
+        assert tx != null;
+
+        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
+
+        def.setName("Ignite Tx [cache=" + (cacheName != null ? cacheName : "<default>")
+ ", id=" + tx.xid() + ']');
+        def.setIsolationLevel(isolationLevel(tx.isolation()));
+        def.setPropagationBehavior(propagation);
+
+        long timeoutSec = (tx.timeout() + 500) / 1000;
+
+        if (timeoutSec > 0 && timeoutSec < Integer.MAX_VALUE)
+            def.setTimeout((int)timeoutSec);
+
+        return def;
+    }
+
+    /**
+     * Gets DB transaction isolation level based on ongoing cache transaction isolation.
+     *
+     * @param isolation Cache transaction isolation.
+     * @return DB transaction isolation.
+     */
+    private int isolationLevel(TransactionIsolation isolation) {
+        switch (isolation) {
+            case READ_COMMITTED:
+                return TransactionDefinition.ISOLATION_READ_COMMITTED;
+
+            case REPEATABLE_READ:
+                return TransactionDefinition.ISOLATION_REPEATABLE_READ;
+
+            case SERIALIZABLE:
+                return TransactionDefinition.ISOLATION_SERIALIZABLE;
+
+            default:
+                throw new IllegalStateException(); // Will never happen.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
b/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
deleted file mode 100644
index e5201ba..0000000
--- a/modules/spring/src/main/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListener.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.spring;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lifecycle.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.transactions.*;
-import org.springframework.jdbc.core.*;
-import org.springframework.jdbc.datasource.*;
-import org.springframework.transaction.*;
-import org.springframework.transaction.support.*;
-
-import javax.cache.integration.*;
-import javax.sql.*;
-
-/**
- * Cache store session listener based on Spring transaction management.
- * <p>
- * This listener starts a new DB transaction for each session and commits
- * or rolls it back when session ends. If there is no ongoing
- * cache transaction, this listener is no-op.
- * <p>
- * Store implementation can use any Spring APIs like {@link JdbcTemplate}
- * and others. The listener will guarantee that if there is an
- * ongoing cache transaction, all store operations within this
- * transaction will be automatically enlisted in the same database
- * transaction.
- * <p>
- * {@link CacheStoreSessionSpringListener} requires that either
- * {@link #setTransactionManager(PlatformTransactionManager) transaction manager}
- * or {@link #setDataSource(DataSource) data source} is configured. If non of them is
- * provided, exception is thrown. Is both are provided, data source will be
- * ignored.
- * <p>
- * If there is a transaction, a {@link TransactionStatus} object will be stored
- * in store session {@link CacheStoreSession#properties() properties} and can be
- * accessed at any moment by {@link #TX_STATUS_KEY} key. This can be used to
- * acquire current DB transaction status.
- */
-public class CacheStoreSessionSpringListener implements CacheStoreSessionListener, LifecycleAware
{
-    /** Session key for transaction status. */
-    public static final String TX_STATUS_KEY = "__spring_tx_status_";
-
-    /** Transaction manager. */
-    private PlatformTransactionManager txMgr;
-
-    /** Data source. */
-    private DataSource dataSrc;
-
-    /** Propagation behavior. */
-    private int propagation = TransactionDefinition.PROPAGATION_REQUIRED;
-
-    /** Logger. */
-    @LoggerResource
-    private IgniteLogger log;
-
-    /**
-     * Sets transaction manager.
-     * <p>
-     * Either transaction manager or data source is required.
-     * If none is provided, exception will be thrown on startup.
-     *
-     * @param txMgr Transaction manager.
-     */
-    public void setTransactionManager(PlatformTransactionManager txMgr) {
-        this.txMgr = txMgr;
-    }
-
-    /**
-     * Gets transaction manager.
-     *
-     * @return Transaction manager.
-     */
-    public PlatformTransactionManager getTransactionManager() {
-        return txMgr;
-    }
-
-    /**
-     * Sets data source.
-     * <p>
-     * Either transaction manager or data source is required.
-     * If none is provided, exception will be thrown on startup.
-     *
-     * @param dataSrc Data source.
-     */
-    public void setDataSource(DataSource dataSrc) {
-        this.dataSrc = dataSrc;
-    }
-
-    /**
-     * Gets data source.
-     *
-     * @return Data source.
-     */
-    public DataSource getDataSource() {
-        return dataSrc;
-    }
-
-    /**
-     * Sets propagation behavior.
-     * <p>
-     * This parameter is optional.
-     *
-     * @param propagation Propagation behavior.
-     */
-    public void setPropagationBehavior(int propagation) {
-        this.propagation = propagation;
-    }
-
-    /**
-     * Gets propagation behavior.
-     *
-     * @return Propagation behavior.
-     */
-    public int getPropagationBehavior() {
-        return propagation;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteException {
-        if (txMgr == null && dataSrc == null)
-            throw new IgniteException("Either transaction manager or data source is required
by " +
-                getClass().getSimpleName() + '.');
-
-        if (dataSrc != null) {
-            if (txMgr == null)
-                txMgr = new DataSourceTransactionManager(dataSrc);
-            else
-                U.warn(log, "Data source configured in " + getClass().getSimpleName() +
-                    " will be ignored (transaction manager is already set).");
-        }
-
-        assert txMgr != null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop() throws IgniteException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onSessionStart(CacheStoreSession ses) {
-        if (ses.isWithinTransaction()) {
-            try {
-                TransactionDefinition def = definition(ses.transaction(), ses.cacheName());
-
-                ses.properties().put(TX_STATUS_KEY, txMgr.getTransaction(def));
-            }
-            catch (TransactionException e) {
-                throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction()
+ ']', e);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
-        if (ses.isWithinTransaction()) {
-            TransactionStatus tx = ses.<String, TransactionStatus>properties().remove(TX_STATUS_KEY);
-
-            if (tx != null) {
-                try {
-                    if (commit)
-                        txMgr.commit(tx);
-                    else
-                        txMgr.rollback(tx);
-                }
-                catch (TransactionException e) {
-                    throw new CacheWriterException("Failed to end store session [tx=" + ses.transaction()
+ ']', e);
-                }
-            }
-        }
-    }
-
-    /**
-     * Gets DB transaction isolation level based on ongoing cache transaction isolation.
-     *
-     * @return DB transaction isolation.
-     */
-    private TransactionDefinition definition(Transaction tx, String cacheName) {
-        assert tx != null;
-
-        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
-
-        def.setName("Ignite Tx [cache=" + (cacheName != null ? cacheName : "<default>")
+ ", id=" + tx.xid() + ']');
-        def.setIsolationLevel(isolationLevel(tx.isolation()));
-        def.setPropagationBehavior(propagation);
-
-        long timeoutSec = (tx.timeout() + 500) / 1000;
-
-        if (timeoutSec > 0 && timeoutSec < Integer.MAX_VALUE)
-            def.setTimeout((int)timeoutSec);
-
-        return def;
-    }
-
-    /**
-     * Gets DB transaction isolation level based on ongoing cache transaction isolation.
-     *
-     * @param isolation Cache transaction isolation.
-     * @return DB transaction isolation.
-     */
-    private int isolationLevel(TransactionIsolation isolation) {
-        switch (isolation) {
-            case READ_COMMITTED:
-                return TransactionDefinition.ISOLATION_READ_COMMITTED;
-
-            case REPEATABLE_READ:
-                return TransactionDefinition.ISOLATION_REPEATABLE_READ;
-
-            case SERIALIZABLE:
-                return TransactionDefinition.ISOLATION_SERIALIZABLE;
-
-            default:
-                throw new IllegalStateException(); // Will never happen.
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListenerSelfTest.java
b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListenerSelfTest.java
new file mode 100644
index 0000000..74f5c69
--- /dev/null
+++ b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheSpringStoreSessionListenerSelfTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.spring;
+
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cache.store.jdbc.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.springframework.jdbc.core.*;
+import org.springframework.jdbc.datasource.*;
+import org.springframework.transaction.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import javax.sql.*;
+import java.sql.*;
+import java.util.*;
+
+/**
+ * Tests for {@link CacheJdbcStoreSessionListener}.
+ */
+public class CacheSpringStoreSessionListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest
{
+    /** */
+    private static final DataSource DATA_SRC = new DriverManagerDataSource(URL);
+
+    /** {@inheritDoc} */
+    @Override protected Factory<? extends CacheStore<Integer, Integer>> storeFactory()
{
+        return new Factory<CacheStore<Integer, Integer>>() {
+            @Override public CacheStore<Integer, Integer> create() {
+                return new Store(new JdbcTemplate(DATA_SRC));
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Factory<CacheStoreSessionListener> sessionListenerFactory()
{
+        return new Factory<CacheStoreSessionListener>() {
+            @Override public CacheStoreSessionListener create() {
+                CacheSpringStoreSessionListener lsnr = new CacheSpringStoreSessionListener();
+
+                lsnr.setDataSource(DATA_SRC);
+
+                return lsnr;
+            }
+        };
+    }
+
+    /**
+     */
+    private static class Store extends CacheStoreAdapter<Integer, Integer> {
+        /** */
+        private static String SES_CONN_KEY = "ses_conn";
+
+        /** */
+        private final JdbcTemplate jdbc;
+
+        /** */
+        @CacheStoreSessionResource
+        private CacheStoreSession ses;
+
+        /**
+         * @param jdbc JDBC template.
+         */
+        private Store(JdbcTemplate jdbc) {
+            this.jdbc = jdbc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<Integer, Integer> clo, Object...
args) {
+            loadCacheCnt.incrementAndGet();
+
+            checkTransaction();
+            checkConnection();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer load(Integer key) throws CacheLoaderException {
+            loadCnt.incrementAndGet();
+
+            checkTransaction();
+            checkConnection();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<? extends Integer, ? extends Integer>
entry)
+            throws CacheWriterException {
+            writeCnt.incrementAndGet();
+
+            checkTransaction();
+            checkConnection();
+
+            if (write.get()) {
+                String table;
+
+                switch (ses.cacheName()) {
+                    case "cache1":
+                        table = "Table1";
+
+                        break;
+
+                    case "cache2":
+                        if (fail.get())
+                            throw new CacheWriterException("Expected failure.");
+
+                        table = "Table2";
+
+                        break;
+
+                    default:
+                        throw new CacheWriterException("Wring cache: " + ses.cacheName());
+                }
+
+                jdbc.update("INSERT INTO " + table + " (key, value) VALUES (?, ?)",
+                    entry.getKey(), entry.getValue());
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            deleteCnt.incrementAndGet();
+
+            checkTransaction();
+            checkConnection();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sessionEnd(boolean commit) {
+            assertNull(ses.attachment());
+        }
+
+        /**
+         */
+        private void checkTransaction() {
+            TransactionStatus tx = ses.attachment();
+
+            if (ses.isWithinTransaction()) {
+                assertNotNull(tx);
+                assertFalse(tx.isCompleted());
+            }
+            else
+                assertNull(tx);
+        }
+
+        /**
+         */
+        private void checkConnection() {
+            Connection conn = DataSourceUtils.getConnection(jdbc.getDataSource());
+
+            assertNotNull(conn);
+
+            try {
+                assertFalse(conn.isClosed());
+                assertEquals(!ses.isWithinTransaction(), conn.getAutoCommit());
+            }
+            catch (SQLException e) {
+                throw new RuntimeException(e);
+            }
+
+            verifySameInstance(conn);
+        }
+
+        /**
+         * @param conn Connection.
+         */
+        private void verifySameInstance(Connection conn) {
+            Map<String, Connection> props = ses.properties();
+
+            Connection sesConn = props.get(SES_CONN_KEY);
+
+            if (sesConn == null)
+                props.put(SES_CONN_KEY, conn);
+            else {
+                assertSame(conn, sesConn);
+
+                reuseCnt.incrementAndGet();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
b/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
deleted file mode 100644
index 83ed249..0000000
--- a/modules/spring/src/test/java/org/apache/ignite/cache/store/spring/CacheStoreSessionSpringListenerSelfTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.spring;
-
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.cache.store.jdbc.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.springframework.jdbc.core.*;
-import org.springframework.jdbc.datasource.*;
-import org.springframework.transaction.*;
-
-import javax.cache.*;
-import javax.cache.configuration.*;
-import javax.cache.integration.*;
-import javax.sql.*;
-import java.sql.*;
-import java.util.*;
-
-/**
- * Tests for {@link CacheStoreSessionJdbcListener}.
- */
-public class CacheStoreSessionSpringListenerSelfTest extends CacheStoreSessionListenerAbstractSelfTest
{
-    /** */
-    private static final DataSource DATA_SRC = new DriverManagerDataSource(URL);
-
-    /** {@inheritDoc} */
-    @Override protected Factory<? extends CacheStore<Integer, Integer>> storeFactory()
{
-        return new Factory<CacheStore<Integer, Integer>>() {
-            @Override public CacheStore<Integer, Integer> create() {
-                return new Store(new JdbcTemplate(DATA_SRC));
-            }
-        };
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Factory<CacheStoreSessionListener> sessionListenerFactory()
{
-        return new Factory<CacheStoreSessionListener>() {
-            @Override public CacheStoreSessionListener create() {
-                CacheStoreSessionSpringListener lsnr = new CacheStoreSessionSpringListener();
-
-                lsnr.setDataSource(DATA_SRC);
-
-                return lsnr;
-            }
-        };
-    }
-
-    /**
-     */
-    private static class Store extends CacheStoreAdapter<Integer, Integer> {
-        /** */
-        private static String SES_CONN_KEY = "ses_conn";
-
-        /** */
-        private final JdbcTemplate jdbc;
-
-        /** */
-        @CacheStoreSessionResource
-        private CacheStoreSession ses;
-
-        /**
-         * @param jdbc JDBC template.
-         */
-        private Store(JdbcTemplate jdbc) {
-            this.jdbc = jdbc;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void loadCache(IgniteBiInClosure<Integer, Integer> clo, Object...
args) {
-            loadCacheCnt.incrementAndGet();
-
-            checkTransaction();
-            checkConnection();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Integer load(Integer key) throws CacheLoaderException {
-            loadCnt.incrementAndGet();
-
-            checkTransaction();
-            checkConnection();
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(Cache.Entry<? extends Integer, ? extends Integer>
entry)
-            throws CacheWriterException {
-            writeCnt.incrementAndGet();
-
-            checkTransaction();
-            checkConnection();
-
-            if (write.get()) {
-                String table;
-
-                switch (ses.cacheName()) {
-                    case "cache1":
-                        table = "Table1";
-
-                        break;
-
-                    case "cache2":
-                        if (fail.get())
-                            throw new CacheWriterException("Expected failure.");
-
-                        table = "Table2";
-
-                        break;
-
-                    default:
-                        throw new CacheWriterException("Wring cache: " + ses.cacheName());
-                }
-
-                jdbc.update("INSERT INTO " + table + " (key, value) VALUES (?, ?)",
-                    entry.getKey(), entry.getValue());
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void delete(Object key) throws CacheWriterException {
-            deleteCnt.incrementAndGet();
-
-            checkTransaction();
-            checkConnection();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void sessionEnd(boolean commit) {
-            assertNull(transaction());
-        }
-
-        /**
-         */
-        private void checkTransaction() {
-            TransactionStatus tx = transaction();
-
-            if (ses.isWithinTransaction()) {
-                assertNotNull(tx);
-                assertFalse(tx.isCompleted());
-            }
-            else
-                assertNull(tx);
-        }
-
-        /**
-         * @return Transaction status.
-         */
-        private TransactionStatus transaction() {
-            return ses.<String, TransactionStatus>properties().get(CacheStoreSessionSpringListener.TX_STATUS_KEY);
-        }
-
-        /**
-         */
-        private void checkConnection() {
-            Connection conn = DataSourceUtils.getConnection(jdbc.getDataSource());
-
-            assertNotNull(conn);
-
-            try {
-                assertFalse(conn.isClosed());
-                assertEquals(!ses.isWithinTransaction(), conn.getAutoCommit());
-            }
-            catch (SQLException e) {
-                throw new RuntimeException(e);
-            }
-
-            verifySameInstance(conn);
-        }
-
-        /**
-         * @param conn Connection.
-         */
-        private void verifySameInstance(Connection conn) {
-            Map<String, Connection> props = ses.properties();
-
-            Connection sesConn = props.get(SES_CONN_KEY);
-
-            if (sesConn == null)
-                props.put(SES_CONN_KEY, conn);
-            else {
-                assertSame(conn, sesConn);
-
-                reuseCnt.incrementAndGet();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/990bf9e3/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
index 0b7e471..12dd494 100644
--- a/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
+++ b/modules/spring/src/test/java/org/apache/ignite/testsuites/IgniteSpringTestSuite.java
@@ -48,7 +48,7 @@ public class IgniteSpringTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(IgniteStartFromStreamConfigurationTest.class));
 
-        suite.addTestSuite(CacheStoreSessionSpringListenerSelfTest.class);
+        suite.addTestSuite(CacheSpringStoreSessionListenerSelfTest.class);
 
         return suite;
     }


Mime
View raw message