# ignite-42
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4b8ec5f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4b8ec5f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4b8ec5f2
Branch: refs/heads/ignite-42
Commit: 4b8ec5f230456293084d9f2224bd5bda90520a81
Parents: 806ce6a
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Jan 15 09:43:58 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Jan 15 17:42:04 2015 +0300
----------------------------------------------------------------------
.../store/dummy/CacheDummyPersonStore.java | 20 +-
.../hibernate/CacheHibernatePersonStore.java | 55 +-
.../store/jdbc/CacheJdbcPersonStore.java | 50 +-
.../org/gridgain/client/GridHashMapStore.java | 16 +-
.../integration/GridClientAbstractSelfTest.java | 16 +-
.../java/org/apache/ignite/IgniteCache.java | 39 +-
.../cache/store/CacheLoadOnlyStoreAdapter.java | 325 +++++++++++
.../ignite/cache/store/CacheLocalStore.java | 31 +
.../apache/ignite/cache/store/CacheStore.java | 154 +++++
.../ignite/cache/store/CacheStoreAdapter.java | 116 ++++
.../cache/store/CacheStoreBalancingWrapper.java | 293 ++++++++++
.../cache/store/jdbc/CacheJdbcBlobStore.java | 573 +++++++++++++++++++
.../apache/ignite/cache/store/jdbc/package.html | 24 +
.../org/apache/ignite/cache/store/package.html | 23 +
.../java/org/gridgain/grid/cache/GridCache.java | 13 +-
.../grid/cache/GridCacheConfiguration.java | 16 +-
.../grid/cache/GridCacheProjection.java | 143 +++--
.../store/GridCacheLoadOnlyStoreAdapter.java | 328 -----------
.../grid/cache/store/GridCacheLocalStore.java | 31 -
.../grid/cache/store/GridCacheStore.java | 220 -------
.../grid/cache/store/GridCacheStoreAdapter.java | 113 ----
.../store/GridCacheStoreBalancingWrapper.java | 278 ---------
.../store/jdbc/GridCacheJdbcBlobStore.java | 552 ------------------
.../gridgain/grid/cache/store/jdbc/package.html | 24 -
.../org/gridgain/grid/cache/store/package.html | 23 -
.../kernal/processors/cache/CacheEntryImpl.java | 60 ++
.../cache/GridCacheLoaderWriterStore.java | 121 +---
.../processors/cache/GridCacheProcessor.java | 6 +-
.../processors/cache/GridCacheProjectionEx.java | 17 +-
.../processors/cache/GridCacheStoreManager.java | 92 ++-
.../cache/GridCacheWriteBehindStore.java | 74 +--
...CacheJdbcBlobStoreMultithreadedSelfTest.java | 243 ++++++++
.../jdbc/GridCacheJdbcBlobStoreSelfTest.java | 51 ++
.../apache/ignite/cache/store/jdbc/package.html | 23 +
.../cache/IgniteCacheAbstractTest.java | 12 +-
...niteCacheAtomicLocalWithStoreInvokeTest.java | 4 +-
...micPrimaryWriteOrderWithStoreInvokeTest.java | 4 +-
...maryWriteOrderWithStoreExpiryPolicyTest.java | 4 +-
...iteCacheAtomicWithStoreExpiryPolicyTest.java | 4 +-
.../IgniteCacheTxWithStoreExpiryPolicyTest.java | 4 +-
.../store/GridCacheBalancingStoreSelfTest.java | 43 +-
.../GridCacheLoadOnlyStoreAdapterSelfTest.java | 6 +-
.../cache/store/GridGeneratingTestStore.java | 38 +-
...CacheJdbcBlobStoreMultithreadedSelfTest.java | 243 --------
.../jdbc/GridCacheJdbcBlobStoreSelfTest.java | 51 --
.../gridgain/grid/cache/store/jdbc/package.html | 23 -
.../cache/GridCacheAbstractFlagsTest.java | 4 +-
.../cache/GridCacheAbstractSelfTest.java | 12 +-
.../cache/GridCacheBasicStoreAbstractTest.java | 4 +-
...acheBasicStoreMultithreadedAbstractTest.java | 12 +-
...idCacheConfigurationConsistencySelfTest.java | 31 +-
.../cache/GridCacheGenericTestStore.java | 77 +--
.../GridCacheGroupLockAbstractSelfTest.java | 16 +-
.../cache/GridCacheLifecycleAwareSelfTest.java | 25 +-
...ridCacheMultinodeUpdateAbstractSelfTest.java | 4 +-
.../cache/GridCachePartitionedWritesTest.java | 13 +-
.../cache/GridCacheReloadSelfTest.java | 11 +-
.../cache/GridCacheStorePutxSelfTest.java | 30 +-
.../cache/GridCacheSwapReloadSelfTest.java | 12 +-
.../processors/cache/GridCacheTestStore.java | 109 ++--
...idCacheWriteBehindStoreAbstractSelfTest.java | 13 +-
.../GridCacheWriteBehindStoreSelfTest.java | 50 +-
.../IgniteTxStoreExceptionAbstractSelfTest.java | 36 +-
...CacheAtomicReferenceApiSelfAbstractTest.java | 4 +-
...chePartitionedReloadAllAbstractSelfTest.java | 12 +-
.../dht/GridCacheColocatedDebugTest.java | 6 +-
.../dht/GridCacheGlobalLoadTest.java | 15 +-
.../near/GridCacheGetStoreErrorSelfTest.java | 15 +-
.../near/GridCacheNearMultiNodeSelfTest.java | 11 +-
.../near/GridCacheNearOneNodeSelfTest.java | 11 +-
.../GridCacheNearPartitionedClearSelfTest.java | 4 +-
.../GridCachePartitionedLoadCacheSelfTest.java | 13 +-
.../GridCachePartitionedStorePutSelfTest.java | 12 +-
.../near/GridPartitionedBackupLoadSelfTest.java | 14 +-
.../GridCacheBatchEvictUnswapSelfTest.java | 13 +-
.../GridCacheEmptyEntriesAbstractSelfTest.java | 15 +-
.../GridCacheEvictionTouchSelfTest.java | 11 +-
...dCacheAtomicLocalMetricsNoStoreSelfTest.java | 4 +-
.../local/GridCacheLocalLoadAllSelfTest.java | 15 +-
...ridCacheContinuousQueryAbstractSelfTest.java | 15 +-
.../GridCacheWriteBehindStoreLoadTest.java | 11 +-
.../colocation/GridTestCacheStore.java | 22 +-
.../swap/GridSwapEvictAllBenchmark.java | 13 +-
.../testframework/junits/GridTestResources.java | 7 +
.../cache/GridAbstractCacheStoreSelfTest.java | 174 +++---
.../junits/cache/TestCacheSession.java | 59 ++
.../cache/TestThreadLocalCacheSession.java | 57 ++
.../bamboo/GridDataGridTestSuite.java | 2 +-
.../hibernate/GridCacheHibernateBlobStore.java | 67 ++-
.../GridCacheHibernateBlobStoreSelfTest.java | 6 +-
.../cache/GridCacheAbstractQuerySelfTest.java | 12 +-
.../cache/GridCacheQueryLoadSelfTest.java | 16 +-
92 files changed, 2931 insertions(+), 2763 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/examples/src/main/java/org/gridgain/examples/datagrid/store/dummy/CacheDummyPersonStore.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/datagrid/store/dummy/CacheDummyPersonStore.java b/examples/src/main/java/org/gridgain/examples/datagrid/store/dummy/CacheDummyPersonStore.java
index 48390ac..f4b6553 100644
--- a/examples/src/main/java/org/gridgain/examples/datagrid/store/dummy/CacheDummyPersonStore.java
+++ b/examples/src/main/java/org/gridgain/examples/datagrid/store/dummy/CacheDummyPersonStore.java
@@ -18,12 +18,12 @@
package org.gridgain.examples.datagrid.store.dummy;
import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.transactions.*;
import org.gridgain.examples.datagrid.store.*;
import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.store.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -32,41 +32,47 @@ import java.util.concurrent.*;
/**
* Dummy cache store implementation.
*/
-public class CacheDummyPersonStore extends GridCacheStoreAdapter<Long, Person> {
+public class CacheDummyPersonStore extends CacheStoreAdapter<Long, Person> {
/** Auto-inject grid instance. */
@IgniteInstanceResource
private Ignite ignite;
/** Auto-inject cache name. */
- @GridCacheName
+ @IgniteCacheNameResource
private String cacheName;
/** Dummy database. */
private Map<Long, Person> dummyDB = new ConcurrentHashMap<>();
/** {@inheritDoc} */
- @Override public Person load(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException {
+ @Override public Person load(Long key) {
+ IgniteTx tx = transaction();
+
System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');
return dummyDB.get(key);
}
/** {@inheritDoc} */
- @Override public void put(@Nullable IgniteTx tx, Long key, Person val) throws IgniteCheckedException {
+ @Override public void put(Long key, Person val) {
+ IgniteTx tx = transaction();
+
System.out.println(">>> Store put [key=" + key + ", val=" + val + ", xid=" + (tx == null ? null : tx.xid()) + ']');
dummyDB.put(key, val);
}
/** {@inheritDoc} */
- @Override public void remove(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException {
+ @Override public void remove(Long key) {
+ IgniteTx tx = transaction();
+
System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');
dummyDB.remove(key);
}
/** {@inheritDoc} */
- @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) throws IgniteCheckedException {
+ @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
int cnt = (Integer)args[0];
System.out.println(">>> Store loadCache for entry count: " + cnt);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/examples/src/main/java/org/gridgain/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java b/examples/src/main/java/org/gridgain/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
index cc0bbc1..c671108 100644
--- a/examples/src/main/java/org/gridgain/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
+++ b/examples/src/main/java/org/gridgain/examples/datagrid/store/hibernate/CacheHibernatePersonStore.java
@@ -17,22 +17,22 @@
package org.gridgain.examples.datagrid.store.hibernate;
-import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
import org.gridgain.examples.datagrid.store.*;
-import org.gridgain.grid.cache.store.*;
import org.hibernate.*;
import org.hibernate.cfg.*;
import org.jetbrains.annotations.*;
+import javax.cache.integration.*;
import java.util.*;
/**
- * Example of {@link GridCacheStore} implementation that uses Hibernate
+ * Example of {@link CacheStore} implementation that uses Hibernate
* and deals with maps {@link UUID} to {@link Person}.
*/
-public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Person> {
+public class CacheHibernatePersonStore extends CacheStoreAdapter<Long, Person> {
/** Default hibernate configuration resource path. */
private static final String DFLT_HIBERNATE_CFG = "/org/gridgain/examples/datagrid/store/hibernate/hibernate.cfg.xml";
@@ -50,7 +50,9 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso
}
/** {@inheritDoc} */
- @Override public Person load(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException {
+ @Override public Person load(Long key) {
+ IgniteTx tx = transaction();
+
System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');
Session ses = session(tx);
@@ -61,7 +63,7 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso
catch (HibernateException e) {
rollback(ses, tx);
- throw new IgniteCheckedException("Failed to load value from cache store with key: " + key, e);
+ throw new CacheLoaderException("Failed to load value from cache store with key: " + key, e);
}
finally {
end(ses, tx);
@@ -69,12 +71,13 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso
}
/** {@inheritDoc} */
- @Override public void put(@Nullable IgniteTx tx, Long key, @Nullable Person val)
- throws IgniteCheckedException {
+ @Override public void put(Long key, @Nullable Person val) {
+ IgniteTx tx = transaction();
+
System.out.println(">>> Store put [key=" + key + ", val=" + val + ", xid=" + (tx == null ? null : tx.xid()) + ']');
if (val == null) {
- remove(tx, key);
+ remove(key);
return;
}
@@ -87,7 +90,7 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso
catch (HibernateException e) {
rollback(ses, tx);
- throw new IgniteCheckedException("Failed to put value to cache store [key=" + key + ", val" + val + "]", e);
+ throw new CacheWriterException("Failed to put value to cache store [key=" + key + ", val" + val + "]", e);
}
finally {
end(ses, tx);
@@ -96,7 +99,9 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso
/** {@inheritDoc} */
@SuppressWarnings({"JpaQueryApiInspection"})
- @Override public void remove(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException {
+ @Override public void remove(Long key) {
+ IgniteTx tx = transaction();
+
System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');
Session ses = session(tx);
@@ -108,7 +113,7 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso
catch (HibernateException e) {
rollback(ses, tx);
- throw new IgniteCheckedException("Failed to remove value from cache store with key: " + key, e);
+ throw new CacheWriterException("Failed to remove value from cache store with key: " + key, e);
}
finally {
end(ses, tx);
@@ -116,9 +121,9 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso
}
/** {@inheritDoc} */
- @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) throws IgniteCheckedException {
+ @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
if (args == null || args.length == 0 || args[0] == null)
- throw new IgniteCheckedException("Expected entry count parameter is not provided.");
+ throw new CacheLoaderException("Expected entry count parameter is not provided.");
final int entryCnt = (Integer)args[0];
@@ -144,7 +149,7 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso
System.out.println(">>> Loaded " + cnt + " values into cache.");
}
catch (HibernateException e) {
- throw new IgniteCheckedException("Failed to load values from cache store.", e);
+ throw new CacheLoaderException("Failed to load values from cache store.", e);
}
finally {
end(ses, null);
@@ -188,8 +193,14 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso
}
/** {@inheritDoc} */
- @Override public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException {
- Session ses = tx.removeMeta(ATTR_SES);
+ @Override public void txEnd(boolean commit) {
+ CacheStoreSession storeSes = session();
+
+ IgniteTx tx = storeSes.transaction();
+
+ Map<Object, Object> props = storeSes.properties();
+
+ Session ses = (Session)props.remove(ATTR_SES);
if (ses != null) {
Transaction hTx = ses.getTransaction();
@@ -207,7 +218,7 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso
System.out.println("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']');
}
catch (HibernateException e) {
- throw new IgniteCheckedException("Failed to end transaction [xid=" + tx.xid() +
+ throw new CacheWriterException("Failed to end transaction [xid=" + tx.xid() +
", commit=" + commit + ']', e);
}
finally {
@@ -227,16 +238,18 @@ public class CacheHibernatePersonStore extends GridCacheStoreAdapter<Long, Perso
Session ses;
if (tx != null) {
- ses = tx.meta(ATTR_SES);
+ Map<Object, Object> props = session().properties();
+
+ ses = (Session)props.get(ATTR_SES);
if (ses == null) {
ses = sesFactory.openSession();
ses.beginTransaction();
- // Store session in transaction metadata, so it can be accessed
+ // Store session in session properties, so it can be accessed
// for other operations on the same transaction.
- tx.addMeta(ATTR_SES, ses);
+ props.put(ATTR_SES, ses);
System.out.println("Hibernate session open [ses=" + ses + ", tx=" + tx.xid() + "]");
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/examples/src/main/java/org/gridgain/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java b/examples/src/main/java/org/gridgain/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
index 2aeb655..2711dba 100644
--- a/examples/src/main/java/org/gridgain/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
+++ b/examples/src/main/java/org/gridgain/examples/datagrid/store/jdbc/CacheJdbcPersonStore.java
@@ -18,21 +18,22 @@
package org.gridgain.examples.datagrid.store.jdbc;
import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
import org.gridgain.examples.datagrid.store.*;
-import org.gridgain.grid.cache.store.*;
import org.jetbrains.annotations.*;
+import javax.cache.integration.*;
import java.sql.*;
import java.util.*;
/**
- * Example of {@link GridCacheStore} implementation that uses JDBC
+ * Example of {@link CacheStore} implementation that uses JDBC
* transaction with cache transactions and maps {@link UUID} to {@link Person}.
*
*/
-public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> {
+public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
/** Transaction metadata attribute name. */
private static final String ATTR_NAME = "SIMPLE_STORE_CONNECTION";
@@ -64,8 +65,12 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> {
}
/** {@inheritDoc} */
- @Override public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException {
- try (Connection conn = tx.removeMeta(ATTR_NAME)) {
+ @Override public void txEnd(boolean commit) {
+ IgniteTx tx = transaction();
+
+ Map<Object, Object> props = session().properties();
+
+ try (Connection conn = (Connection)props.remove(ATTR_NAME)) {
if (conn != null) {
if (commit)
conn.commit();
@@ -76,12 +81,14 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> {
System.out.println(">>> Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']');
}
catch (SQLException e) {
- throw new IgniteCheckedException("Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e);
+ throw new CacheWriterException("Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e);
}
}
/** {@inheritDoc} */
- @Nullable @Override public Person load(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException {
+ @Nullable @Override public Person load(Long key) {
+ IgniteTx tx = transaction();
+
System.out.println(">>> Store load [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');
Connection conn = null;
@@ -99,7 +106,7 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> {
}
}
catch (SQLException e) {
- throw new IgniteCheckedException("Failed to load object: " + key, e);
+ throw new CacheLoaderException("Failed to load object: " + key, e);
}
finally {
end(tx, conn);
@@ -109,8 +116,9 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> {
}
/** {@inheritDoc} */
- @Override public void put(@Nullable IgniteTx tx, Long key, Person val)
- throws IgniteCheckedException {
+ @Override public void put(Long key, Person val) {
+ IgniteTx tx = transaction();
+
System.out.println(">>> Store put [key=" + key + ", val=" + val + ", xid=" + (tx == null ? null : tx.xid()) + ']');
Connection conn = null;
@@ -142,7 +150,7 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> {
}
}
catch (SQLException e) {
- throw new IgniteCheckedException("Failed to put object [key=" + key + ", val=" + val + ']', e);
+ throw new CacheLoaderException("Failed to put object [key=" + key + ", val=" + val + ']', e);
}
finally {
end(tx, conn);
@@ -150,7 +158,9 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> {
}
/** {@inheritDoc} */
- @Override public void remove(@Nullable IgniteTx tx, Long key) throws IgniteCheckedException {
+ @Override public void remove(Long key) {
+ IgniteTx tx = transaction();
+
System.out.println(">>> Store remove [key=" + key + ", xid=" + (tx == null ? null : tx.xid()) + ']');
Connection conn = null;
@@ -165,7 +175,7 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> {
}
}
catch (SQLException e) {
- throw new IgniteCheckedException("Failed to remove object: " + key, e);
+ throw new CacheLoaderException("Failed to remove object: " + key, e);
}
finally {
end(tx, conn);
@@ -173,9 +183,9 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> {
}
/** {@inheritDoc} */
- @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) throws IgniteCheckedException {
+ @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
if (args == null || args.length == 0 || args[0] == null)
- throw new IgniteCheckedException("Expected entry count parameter is not provided.");
+ throw new CacheLoaderException("Expected entry count parameter is not provided.");
final int entryCnt = (Integer)args[0];
@@ -201,7 +211,7 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> {
}
}
catch (SQLException e) {
- throw new IgniteCheckedException("Failed to load values from cache store.", e);
+ throw new CacheLoaderException("Failed to load values from cache store.", e);
}
finally {
end(null, conn);
@@ -215,14 +225,16 @@ public class CacheJdbcPersonStore extends GridCacheStoreAdapter<Long, Person> {
*/
private Connection connection(@Nullable IgniteTx tx) throws SQLException {
if (tx != null) {
- Connection conn = tx.meta(ATTR_NAME);
+ Map<Object, Object> props = session().properties();
+
+ Connection conn = (Connection)props.get(ATTR_NAME);
if (conn == null) {
conn = openConnection(false);
- // Store connection in transaction metadata, so it can be accessed
+ // Store connection in session properties, so it can be accessed
// for other operations on the same transaction.
- tx.addMeta(ATTR_NAME, conn);
+ props.put(ATTR_NAME, conn);
}
return conn;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/clients/src/test/java/org/gridgain/client/GridHashMapStore.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/gridgain/client/GridHashMapStore.java b/modules/clients/src/test/java/org/gridgain/client/GridHashMapStore.java
index 67fc50d..5b0fbe7 100644
--- a/modules/clients/src/test/java/org/gridgain/client/GridHashMapStore.java
+++ b/modules/clients/src/test/java/org/gridgain/client/GridHashMapStore.java
@@ -18,9 +18,9 @@
package org.gridgain.client;
import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
-import org.gridgain.grid.cache.store.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -28,32 +28,28 @@ import java.util.*;
/**
* Simple HashMap based cache store emulation.
*/
-public class GridHashMapStore extends GridCacheStoreAdapter {
+public class GridHashMapStore extends CacheStoreAdapter {
/** Map for cache store. */
private final Map<Object, Object> map = new HashMap<>();
/** {@inheritDoc} */
- @Override public void loadCache(IgniteBiInClosure c, Object... args)
- throws IgniteCheckedException {
+ @Override public void loadCache(IgniteBiInClosure c, Object... args) {
for (Map.Entry e : map.entrySet())
c.apply(e.getKey(), e.getValue());
}
/** {@inheritDoc} */
- @Override public Object load(@Nullable IgniteTx tx, Object key)
- throws IgniteCheckedException {
+ @Override public Object load(Object key) {
return map.get(key);
}
/** {@inheritDoc} */
- @Override public void put(@Nullable IgniteTx tx, Object key,
- @Nullable Object val) throws IgniteCheckedException {
+ @Override public void put(Object key, @Nullable Object val) {
map.put(key, val);
}
/** {@inheritDoc} */
- @Override public void remove(@Nullable IgniteTx tx, Object key)
- throws IgniteCheckedException {
+ @Override public void remove(Object key) {
map.remove(key);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java b/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java
index 58d6894..6e8d1a3 100644
--- a/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/gridgain/client/integration/GridClientAbstractSelfTest.java
@@ -20,6 +20,7 @@ package org.gridgain.client.integration;
import junit.framework.*;
import net.sf.json.*;
import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.lang.*;
@@ -32,7 +33,6 @@ import org.gridgain.client.*;
import org.gridgain.client.ssl.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.affinity.consistenthash.*;
-import org.gridgain.grid.cache.store.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.testframework.junits.common.*;
@@ -1564,33 +1564,29 @@ public abstract class GridClientAbstractSelfTest extends GridCommonAbstractTest
/**
* Simple HashMap based cache store emulation.
*/
- private static class HashMapStore extends GridCacheStoreAdapter<Object, Object> {
+ private static class HashMapStore extends CacheStoreAdapter<Object, Object> {
/** Map for cache store. */
private final Map<Object, Object> map = new HashMap<>();
/** {@inheritDoc} */
- @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args)
- throws IgniteCheckedException {
+ @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) {
for (Map.Entry e : map.entrySet()) {
clo.apply(e.getKey(), e.getValue());
}
}
/** {@inheritDoc} */
- @Override public Object load(@Nullable IgniteTx tx, Object key)
- throws IgniteCheckedException {
+ @Override public Object load(Object key) {
return map.get(key);
}
/** {@inheritDoc} */
- @Override public void put(@Nullable IgniteTx tx, Object key,
- @Nullable Object val) throws IgniteCheckedException {
+ @Override public void put(Object key, @Nullable Object val) {
map.put(key, val);
}
/** {@inheritDoc} */
- @Override public void remove(@Nullable IgniteTx tx, Object key)
- throws IgniteCheckedException {
+ @Override public void remove(Object key) {
map.remove(key);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 2988005..30850e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -22,7 +22,6 @@ import org.apache.ignite.cache.query.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
import org.gridgain.grid.cache.*;
-import org.gridgain.grid.cache.store.*;
import org.jetbrains.annotations.*;
import javax.cache.*;
@@ -95,20 +94,20 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* @param p Optional predicate (may be {@code null}). If provided, will be used to
* filter values to be put into cache.
* @param args Optional user arguments to be passed into
- * {@link GridCacheStore#loadCache(IgniteBiInClosure, Object...)} method.
+ * {@link org.apache.ignite.cache.store.CacheStore#loadCache(IgniteBiInClosure, Object...)} method.
* @throws CacheException If loading failed.
*/
public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException;
/**
- * Delegates to {@link GridCacheStore#loadCache(IgniteBiInClosure,Object...)} method
+ * Delegates to {@link org.apache.ignite.cache.store.CacheStore#loadCache(IgniteBiInClosure,Object...)} method
* to load state from the underlying persistent storage. The loaded values
* will then be given to the optionally passed in predicate, and, if the predicate returns
* {@code true}, will be stored in cache. If predicate is {@code null}, then
* all loaded values will be stored in cache.
* <p>
* Note that this method does not receive keys as a parameter, so it is up to
- * {@link GridCacheStore} implementation to provide all the data to be loaded.
+ * {@link org.apache.ignite.cache.store.CacheStore} implementation to provide all the data to be loaded.
* <p>
* This method is not transactional and may end up loading a stale value into
* cache if another thread has updated the value immediately after it has been
@@ -118,7 +117,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* @param p Optional predicate (may be {@code null}). If provided, will be used to
* filter values to be put into cache.
* @param args Optional user arguments to be passed into
- * {@link GridCacheStore#loadCache(IgniteBiInClosure, Object...)} method.
+ * {@link org.apache.ignite.cache.store.CacheStore#loadCache(IgniteBiInClosure, Object...)} method.
* @throws CacheException If loading failed.
*/
public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException;
@@ -130,14 +129,14 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* the value will be loaded from the primary node, which in its turn may load the value
* from the swap storage, and consecutively, if it's not in swap,
* from the underlying persistent storage. If value has to be loaded from persistent
- * storage, {@link GridCacheStore#load(IgniteTx, Object)} method will be used.
+ * storage, {@link org.apache.ignite.cache.store.CacheStore#load(IgniteTx, Object)} method will be used.
* <p>
* If the returned value is not needed, method {@link #putIfAbsent(Object, Object)} should
* always be used instead of this one to avoid the overhead associated with returning of the
* previous value.
* <p>
- * If write-through is enabled, the stored value will be persisted to {@link GridCacheStore}
- * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method.
+ * If write-through is enabled, the stored value will be persisted to {@link org.apache.ignite.cache.store.CacheStore}
+ * via {@link org.apache.ignite.cache.store.CacheStore#put(IgniteTx, Object, Object)} method.
* <h2 class="header">Transactions</h2>
* This method is transactional and will enlist the entry into ongoing transaction
* if there is one.
@@ -164,8 +163,8 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* are acquired in undefined order, so it may cause a deadlock when used with
* other concurrent transactional updates.
* <p>
- * If write-through is enabled, the values will be removed from {@link GridCacheStore}
- * via {@link GridCacheStore#removeAll(IgniteTx, java.util.Collection)} method.
+ * If write-through is enabled, the values will be removed from {@link org.apache.ignite.cache.store.CacheStore}
+ * via {@link org.apache.ignite.cache.store.CacheStore#removeAll(IgniteTx, java.util.Collection)} method.
* <h2 class="header">Transactions</h2>
* This method is transactional and will enlist the entry into ongoing transaction
* if there is one.
@@ -319,13 +318,13 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* the value will be loaded from the primary node, which in its turn may load the value
* from the swap storage, and consecutively, if it's not in swap,
* from the underlying persistent storage. If value has to be loaded from persistent
- * storage, {@link GridCacheStore#load(IgniteTx, Object)} method will be used.
+ * storage, {@link org.apache.ignite.cache.store.CacheStore#load(IgniteTx, Object)} method will be used.
* <p>
* If the returned value is not needed, method {@link #putIf(Object, Object, IgnitePredicate)} should
* always be used instead of this one to avoid the overhead associated with returning of the previous value.
* <p>
- * If write-through is enabled, the stored value will be persisted to {@link GridCacheStore}
- * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method.
+ * If write-through is enabled, the stored value will be persisted to {@link org.apache.ignite.cache.store.CacheStore}
+ * via {@link org.apache.ignite.cache.store.CacheStore#put(IgniteTx, Object, Object)} method.
* <h2 class="header">Transactions</h2>
* This method is transactional and will enlist the entry into ongoing transaction
* if there is one.
@@ -356,8 +355,8 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* value and, therefore, does not have any overhead associated with returning a value. It
* should be used whenever return value is not required.
* <p>
- * If write-through is enabled, the stored value will be persisted to {@link GridCacheStore}
- * via {@link GridCacheStore#put(IgniteTx, Object, Object)} method.
+ * If write-through is enabled, the stored value will be persisted to {@link org.apache.ignite.cache.store.CacheStore}
+ * via {@link org.apache.ignite.cache.store.CacheStore#put(IgniteTx, Object, Object)} method.
* <h2 class="header">Transactions</h2>
* This method is transactional and will enlist the entry into ongoing transaction
* if there is one.
@@ -384,14 +383,14 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* caches, the value will be loaded from the primary node, which in its turn may load the value
* from the disk-based swap storage, and consecutively, if it's not in swap,
* from the underlying persistent storage. If value has to be loaded from persistent
- * storage, {@link GridCacheStore#load(IgniteTx, Object)} method will be used.
+ * storage, {@link org.apache.ignite.cache.store.CacheStore#load(IgniteTx, Object)} method will be used.
* <p>
* If the returned value is not needed, method {@link #removeIf(Object, IgnitePredicate)} should
* always be used instead of this one to avoid the overhead associated with returning of the
* previous value.
* <p>
- * If write-through is enabled, the value will be removed from {@link GridCacheStore}
- * via {@link GridCacheStore#remove(IgniteTx, Object)} method.
+ * If write-through is enabled, the value will be removed from {@link org.apache.ignite.cache.store.CacheStore}
+ * via {@link org.apache.ignite.cache.store.CacheStore#remove(IgniteTx, Object)} method.
* <h2 class="header">Transactions</h2>
* This method is transactional and will enlist the entry into ongoing transaction
* if there is one.
@@ -416,8 +415,8 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
* This method will return {@code true} if remove did occur, which means that all optionally
* provided filters have passed and there was something to remove, {@code false} otherwise.
* <p>
- * If write-through is enabled, the value will be removed from {@link GridCacheStore}
- * via {@link GridCacheStore#remove(IgniteTx, Object)} method.
+ * If write-through is enabled, the value will be removed from {@link org.apache.ignite.cache.store.CacheStore}
+ * via {@link org.apache.ignite.cache.store.CacheStore#remove(IgniteTx, Object)} method.
* <h2 class="header">Transactions</h2>
* This method is transactional and will enlist the entry into ongoing transaction
* if there is one.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
new file mode 100644
index 0000000..0d7a85b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.integration.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+
+/**
+ * This adepter designed to support stores with bulk loading from stream-like source.
+ * <p>
+ * This class processes input data in the following way:
+ * <ul>
+ * <li>
+ * Iterator of input record obtained from user-defined {@link #inputIterator(Object...)}.
+ * </li>
+ * <li>
+ * Iterator continuously queried for input records and they are grouped into batches of {@link #batchSize}.
+ * </li>
+ * <li>
+ * Batch is placed into processing queue and puled by one of {@link #threadsCnt} working threads.
+ * </li>
+ * <li>
+ * Each record in batch is passed to user-defined {@link #parse(Object, Object...)} method
+ * and result is stored into cache.
+ * </li>
+ * </ul>
+ * <p>
+ * Two methods should be implemented by inheritants:
+ * <ul>
+ * <li>
+ * {@link #inputIterator(Object...)}. It should open underlying data source
+ * and iterate all record available in it. Individual records could be in very raw form,
+ * like text lines for CSV files.
+ * </li>
+ * <li>
+ * {@link #parse(Object, Object...)}. This method should process input records
+ * and transform them into key-value pairs for cache.
+ * </li>
+ * </ul>
+ * <p>
+ *
+ * @param <K> Key type.
+ * @param <V> Value type.
+ * @param <I> Input type.
+ */
+public abstract class CacheLoadOnlyStoreAdapter<K, V, I> implements CacheStore<K, V> {
+ /**
+ * Default batch size (number of records read with {@link #inputIterator(Object...)}
+ * and then submitted to internal pool at a time).
+ */
+ public static final int DFLT_BATCH_SIZE = 100;
+
+ /** Default batch queue size (max batches count to limit memory usage). */
+ public static final int DFLT_BATCH_QUEUE_SIZE = 100;
+
+ /** Default number of working threads (equal to the number of available processors). */
+ public static final int DFLT_THREADS_COUNT = Runtime.getRuntime().availableProcessors();
+
+ /** Auto-injected logger. */
+ @IgniteLoggerResource
+ private IgniteLogger log;
+
+ /** Batch size. */
+ private int batchSize = DFLT_BATCH_SIZE;
+
+ /** Size of queue of batches to process. */
+ private int batchQueueSize = DFLT_BATCH_QUEUE_SIZE;
+
+ /** Number fo working threads. */
+ private int threadsCnt = DFLT_THREADS_COUNT;
+
+ /**
+ * Returns iterator of input records.
+ * <p>
+ * Note that returned iterator doesn't have to be thread-safe. Thus it could
+ * operate on raw streams, DB connections, etc. without additional synchronization.
+ *
+ * @param args Arguments passes into {@link GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} method.
+ * @return Iterator over input records.
+ * @throws CacheLoaderException If iterator can't be created with the given arguments.
+ */
+ protected abstract Iterator<I> inputIterator(@Nullable Object... args) throws CacheLoaderException;
+
+ /**
+ * This method should transform raw data records into valid key-value pairs
+ * to be stored into cache.
+ * <p>
+ * If {@code null} is returned then this record will be just skipped.
+ *
+ * @param rec A raw data record.
+ * @param args Arguments passed into {@link GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} method.
+ * @return Cache entry to be saved in cache or {@code null} if no entry could be produced from this record.
+ */
+ @Nullable protected abstract IgniteBiTuple<K, V> parse(I rec, @Nullable Object... args);
+
+ /** {@inheritDoc} */
+ @Override public void loadCache(IgniteBiInClosure<K, V> c, @Nullable Object... args) {
+ ExecutorService exec = new ThreadPoolExecutor(
+ threadsCnt,
+ threadsCnt,
+ 0L,
+ MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(batchQueueSize),
+ new BlockingRejectedExecutionHandler());
+
+ Iterator<I> iter = inputIterator(args);
+
+ Collection<I> buf = new ArrayList<>(batchSize);
+
+ try {
+ while (iter.hasNext()) {
+ if (Thread.currentThread().isInterrupted()) {
+ U.warn(log, "Working thread was interrupted while loading data.");
+
+ break;
+ }
+
+ buf.add(iter.next());
+
+ if (buf.size() == batchSize) {
+ exec.submit(new Worker(c, buf, args));
+
+ buf = new ArrayList<>(batchSize);
+ }
+ }
+
+ if (!buf.isEmpty())
+ exec.submit(new Worker(c, buf, args));
+ }
+ catch (RejectedExecutionException ignored) {
+ // Because of custom RejectedExecutionHandler.
+ assert false : "RejectedExecutionException was thrown while it shouldn't.";
+ }
+ finally {
+ exec.shutdown();
+
+ try {
+ exec.awaitTermination(Long.MAX_VALUE, MILLISECONDS);
+ }
+ catch (InterruptedException ignored) {
+ U.warn(log, "Working thread was interrupted while waiting for put operations to complete.");
+
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Returns batch size.
+ *
+ * @return Batch size.
+ */
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ /**
+ * Sets batch size.
+ *
+ * @param batchSize Batch size.
+ */
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ /**
+ * Returns batch queue size.
+ *
+ * @return Batch queue size.
+ */
+ public int getBatchQueueSize() {
+ return batchQueueSize;
+ }
+
+ /**
+ * Sets batch queue size.
+ *
+ * @param batchQueueSize Batch queue size.
+ */
+ public void setBatchQueueSize(int batchQueueSize) {
+ this.batchQueueSize = batchQueueSize;
+ }
+
+ /**
+ * Returns number of worker threads.
+ *
+ * @return Number of worker threads.
+ */
+ public int getThreadsCount() {
+ return threadsCnt;
+ }
+
+ /**
+ * Sets number of worker threads.
+ *
+ * @param threadsCnt Number of worker threads.
+ */
+ public void setThreadsCount(int threadsCnt) {
+ this.threadsCnt = threadsCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V load(K key) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, V> loadAll(Iterable<? extends K> keys) {
+ return Collections.emptyMap();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends K, ? extends V> entry) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deleteAll(Collection<?> keys) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void txEnd(boolean commit) {
+ // No-op.
+ }
+
+ /**
+ * Worker.
+ */
+ private class Worker implements Runnable {
+ /** */
+ private final IgniteBiInClosure<K, V> c;
+
+ /** */
+ private final Collection<I> buf;
+
+ /** */
+ private final Object[] args;
+
+ /**
+ * @param c Closure for loaded entries.
+ * @param buf Set of input records to process.
+ * @param args Arguments passed into {@link GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} method.
+ */
+ Worker(IgniteBiInClosure<K, V> c, Collection<I> buf, Object[] args) {
+ this.c = c;
+ this.buf = buf;
+ this.args = args;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ for (I rec : buf) {
+ IgniteBiTuple<K, V> entry = parse(rec, args);
+
+ if (entry != null)
+ c.apply(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ /**
+ * This handler blocks the caller thread until free space will be available in tasks queue.
+ * If the executor is shut down than it throws {@link RejectedExecutionException}.
+ * <p>
+ * It is save to apply this policy when:
+ * <ol>
+ * <li>{@code shutdownNow} is not used on the pool.</li>
+ * <li>{@code shutdown} is called from the thread where all submissions where performed.</li>
+ * </ol>
+ */
+ private class BlockingRejectedExecutionHandler implements RejectedExecutionHandler {
+ /** {@inheritDoc} */
+ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ if (executor.isShutdown())
+ throw new RejectedExecutionException();
+ else
+ executor.getQueue().put(r);
+ }
+ catch (InterruptedException ignored) {
+ U.warn(log, "Working thread was interrupted while loading data.");
+
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLocalStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLocalStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLocalStore.java
new file mode 100644
index 0000000..6fdec5a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLocalStore.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import java.lang.annotation.*;
+
+/**
+ * Annotation for local {@link CacheStore} implementation. "Local" here means that there is no global
+ * database behind the grid but each node has an independent one.
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface CacheLocalStore {
+ // No-op.
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
new file mode 100644
index 0000000..4cdfe5a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.IgnitePortables;
+import org.apache.ignite.cache.store.jdbc.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.portables.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.cache.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.integration.*;
+import java.sql.*;
+import java.util.*;
+import java.util.Date;
+
+import static javax.cache.Cache.*;
+
+/**
+ * API for cache persistent storage for read-through and write-through behavior.
+ * Persistent store is configured via {@link GridCacheConfiguration#getStore()}
+ * configuration property. If not provided, values will be only kept in cache memory
+ * or swap storage without ever being persisted to a persistent storage.
+ * <p>
+ * {@link CacheStoreAdapter} provides default implementation for bulk operations,
+ * such as {@link #loadAll(Iterable)},
+ * {@link #writeAll(Collection)}, and {@link #deleteAll(Collection)}
+ * by sequentially calling corresponding {@link #load(Object)},
+ * {@link #write(Entry)}, and {@link #delete(Object)}
+ * operations. Use this adapter whenever such behaviour is acceptable. However
+ * in many cases it maybe more preferable to take advantage of database batch update
+ * functionality, and therefore default adapter implementation may not be the best option.
+ * <p>
+ * Provided implementations may be used for test purposes:
+ * <ul>
+ * <li>{@gglink org.gridgain.grid.cache.store.hibernate.GridCacheHibernateBlobStore}</li>
+ * <li>{@link CacheJdbcBlobStore}</li>
+ * </ul>
+ * <p>
+ * All transactional operations of this API are provided with ongoing {@link IgniteTx},
+ * if any. As transaction is {@link GridMetadataAware}, you can attach any metadata to
+ * it, e.g. to recognize if several operations belong to the same transaction or not.
+ * Here is an example of how attach a JDBC connection as transaction metadata:
+ * <pre name="code" class="java">
+ * Connection conn = tx.meta("some.name");
+ *
+ * if (conn == null) {
+ * conn = ...; // Get JDBC connection.
+ *
+ * // Store connection in transaction metadata, so it can be accessed
+ * // for other operations on the same transaction.
+ * tx.addMeta("some.name", conn);
+ * }
+ * </pre>
+ * <h1 class="header">Working With Portable Objects</h1>
+ * When portables are enabled for cache by setting {@link GridCacheConfiguration#isPortableEnabled()} to
+ * {@code true}), all portable keys and values are converted to instances of {@link PortableObject}.
+ * Therefore, all cache store methods will take parameters in portable format. To avoid class
+ * cast exceptions, store must have signature compatible with portables. E.g., if you use {@link Integer}
+ * as a key and {@code Value} class as a value (which will be converted to portable format), cache store
+ * signature should be the following:
+ * <pre name="code" class="java">
+ * public class PortableCacheStore implements GridCacheStore<Integer, GridPortableObject> {
+ * public void put(@Nullable GridCacheTx tx, Integer key, GridPortableObject val) throws IgniteCheckedException {
+ * ...
+ * }
+ *
+ * ...
+ * }
+ * </pre>
+ * This behavior can be overridden by setting {@link GridCacheConfiguration#setKeepPortableInStore(boolean)}
+ * flag value to {@code false}. In this case, GridGain will deserialize keys and values stored in portable
+ * format before they are passed to cache store, so that you can use the following cache store signature instead:
+ * <pre name="code" class="java">
+ * public class ObjectsCacheStore implements GridCacheStore<Integer, Person> {
+ * public void put(@Nullable GridCacheTx tx, Integer key, Person val) throws GridException {
+ * ...
+ * }
+ *
+ * ...
+ * }
+ * </pre>
+ * Note that while this can simplify store implementation in some cases, it will cause performance degradation
+ * due to additional serializations and deserializations of portable objects. You will also need to have key
+ * and value classes on all nodes since portables will be deserialized when store is invoked.
+ * <p>
+ * Note that only portable classes are converted to {@link PortableObject} format. Following
+ * types are stored in cache without changes and therefore should not affect cache store signature:
+ * <ul>
+ * <li>All primitives (byte, int, ...) and there boxed versions (Byte, Integer, ...)</li>
+ * <li>Arrays of primitives (byte[], int[], ...)</li>
+ * <li>{@link String} and array of {@link String}s</li>
+ * <li>{@link UUID} and array of {@link UUID}s</li>
+ * <li>{@link Date} and array of {@link Date}s</li>
+ * <li>{@link Timestamp} and array of {@link Timestamp}s</li>
+ * <li>Enums and array of enums</li>
+ * <li>
+ * Maps, collections and array of objects (but objects inside
+ * them will still be converted if they are portable)
+ * </li>
+ * </ul>
+ *
+ * @see IgnitePortables
+ */
+public interface CacheStore<K, V> extends CacheLoader<K, V>, CacheWriter<K, V> {
+ /**
+ * Loads all values from underlying persistent storage. Note that keys are not
+ * passed, so it is up to implementation to figure out what to load. This method
+ * is called whenever {@link GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)}
+ * method is invoked which is usually to preload the cache from persistent storage.
+ * <p>
+ * This method is optional, and cache implementation does not depend on this
+ * method to do anything. Default implementation of this method in
+ * {@link CacheStoreAdapter} does nothing.
+ * <p>
+ * For every loaded value method {@link org.apache.ignite.lang.IgniteBiInClosure#apply(Object, Object)}
+ * should be called on the passed in closure. The closure will then make sure
+ * that the loaded value is stored in cache.
+ *
+ * @param clo Closure for loaded values.
+ * @param args Arguments passes into
+ * {@link GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} method.
+ * @throws CacheLoaderException If loading failed.
+ */
+ public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) throws CacheLoaderException;
+
+ /**
+ * Tells store to commit or rollback a transaction depending on the value of the {@code 'commit'}
+ * parameter.
+ *
+ * @param commit {@code True} if transaction should commit, {@code false} for rollback.
+ * @throws CacheWriterException If commit or rollback failed. Note that commit failure in some cases
+ * may bring cache transaction into {@link IgniteTxState#UNKNOWN} which will
+ * consequently cause all transacted entries to be invalidated.
+ */
+ public void txEnd(boolean commit) throws CacheWriterException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java
new file mode 100644
index 0000000..4ec5cbf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreAdapter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import java.util.*;
+
+/**
+ * Cache storage convenience adapter. It provides default implementation for bulk operations, such
+ * as {@link #loadAll(Iterable)},
+ * {@link #writeAll(Collection)}, and {@link #deleteAll(Collection)}
+ * by sequentially calling corresponding {@link #load(Object)},
+ * {@link #write(Cache.Entry)}, and {@link #delete(Object)}
+ * operations. Use this adapter whenever such behaviour is acceptable. However in many cases
+ * it maybe more preferable to take advantage of database batch update functionality, and therefore
+ * default adapter implementation may not be the best option.
+ * <p>
+ * Note that method {@link #loadCache(org.apache.ignite.lang.IgniteBiInClosure, Object...)} has empty
+ * implementation because it is essentially up to the user to invoke it with
+ * specific arguments.
+ */
+public abstract class CacheStoreAdapter<K, V> implements CacheStore<K, V> {
+ /** */
+ @IgniteCacheSessionResource
+ private CacheStoreSession ses;
+
+ /**
+ * Default empty implementation. This method needs to be overridden only if
+ * {@link GridCache#loadCache(IgniteBiPredicate, long, Object...)} method
+ * is explicitly called.
+ *
+ * @param clo {@inheritDoc}
+ * @param args {@inheritDoc}
+ */
+ @Override public void loadCache(IgniteBiInClosure<K, V> clo, Object... args) {
+ /* No-op. */
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, V> loadAll(Iterable<? extends K> keys) {
+ assert keys != null;
+
+ Map<K, V> loaded = new HashMap<>();
+
+ for (K key : keys) {
+ V v = load(key);
+
+ if (v != null)
+ loaded.put(key, v);
+ }
+
+ return loaded;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) {
+ assert entries != null;
+
+ for (Cache.Entry<? extends K, ? extends V> e : entries)
+ write(e);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deleteAll(Collection<?> keys) {
+ assert keys != null;
+
+ for (Object key : keys)
+ delete(key);
+ }
+
+ /**
+ * Default empty implementation for ending transactions. Note that if explicit cache
+ * transactions are not used, then transactions do not have to be explicitly ended -
+ * for all other cases this method should be overridden with custom commit/rollback logic.
+ *
+ * @param commit {@inheritDoc}
+ */
+ @Override public void txEnd(boolean commit) {
+ // No-op.
+ }
+
+ /**
+ * @return Current session.
+ */
+ @Nullable protected CacheStoreSession session() {
+ return ses;
+ }
+
+ /**
+ * @return Current transaction.
+ */
+ @Nullable protected IgniteTx transaction() {
+ return ses != null ? ses.transaction() : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4b8ec5f2/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreBalancingWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreBalancingWrapper.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreBalancingWrapper.java
new file mode 100644
index 0000000..c76d4ec
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreBalancingWrapper.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.util.future.*;
+import org.gridgain.grid.util.typedef.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.integration.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Cache store wrapper that ensures that there will be no more that one thread loading value from underlying store.
+ */
+public class CacheStoreBalancingWrapper<K, V> implements CacheStore<K, V> {
+ /** */
+ public static final int DFLT_LOAD_ALL_THRESHOLD = 5;
+
+ /** Delegate store. */
+ private CacheStore<K, V> delegate;
+
+ /** Pending cache store loads. */
+ private ConcurrentMap<K, LoadFuture> pendingLoads = new ConcurrentHashMap8<>();
+
+ /** Load all threshold. */
+ private int loadAllThreshold = DFLT_LOAD_ALL_THRESHOLD;
+
+ /**
+ * @param delegate Delegate store.
+ */
+ public CacheStoreBalancingWrapper(CacheStore<K, V> delegate) {
+ this.delegate = delegate;
+ }
+
+ /**
+ * @param delegate Delegate store.
+ * @param loadAllThreshold Load all threshold.
+ */
+ public CacheStoreBalancingWrapper(CacheStore<K, V> delegate, int loadAllThreshold) {
+ this.delegate = delegate;
+ this.loadAllThreshold = loadAllThreshold;
+ }
+
+ /**
+ * @return Load all threshold.
+ */
+ public int loadAllThreshold() {
+ return loadAllThreshold;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public V load(K key) {
+ LoadFuture fut = pendingLoads.get(key);
+
+ try {
+ if (fut != null)
+ return fut.get(key);
+
+ fut = new LoadFuture();
+
+ LoadFuture old = pendingLoads.putIfAbsent(key, fut);
+
+ if (old != null)
+ return old.get(key);
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheLoaderException(e);
+ }
+
+ try {
+ V val = delegate.load(key);
+
+ fut.onComplete(key, val);
+
+ return val;
+ }
+ catch (Throwable e) {
+ fut.onError(key, e);
+
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) {
+ delegate.loadCache(clo, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException {
+ return delegate.loadAll(keys);
+ }
+
+ /**
+ * @param keys Keys to load.
+ * @param c Closure for loaded values.
+ */
+ public void loadAll(Collection<? extends K> keys, final IgniteBiInClosure<K, V> c) {
+ assert keys.size() < loadAllThreshold;
+
+ Collection<K> needLoad = null;
+ Map<K, LoadFuture> pending = null;
+ LoadFuture span = null;
+
+ for (K key : keys) {
+ LoadFuture fut = pendingLoads.get(key);
+
+ if (fut != null) {
+ if (pending == null)
+ pending = new HashMap<>();
+
+ pending.put(key, fut);
+ }
+ else {
+ // Try to concurrently add pending future.
+ if (span == null)
+ span = new LoadFuture();
+
+ LoadFuture old = pendingLoads.putIfAbsent(key, span);
+
+ if (old != null) {
+ if (pending == null)
+ pending = new HashMap<>();
+
+ pending.put(key, old);
+ }
+ else {
+ if (needLoad == null)
+ needLoad = new ArrayList<>(keys.size());
+
+ needLoad.add(key);
+ }
+ }
+ }
+
+ if (needLoad != null) {
+ assert !needLoad.isEmpty();
+ assert span != null;
+
+ try {
+ Map<K, V> loaded = delegate.loadAll(needLoad);
+
+ for (Map.Entry<K, V> e : loaded.entrySet())
+ c.apply(e.getKey(), e.getValue());
+
+ span.onComplete(needLoad, loaded);
+ }
+ catch (Throwable e) {
+ span.onError(needLoad, e);
+
+ throw e;
+ }
+ }
+
+ if (pending != null) {
+ try {
+ for (Map.Entry<K, LoadFuture> e : pending.entrySet()) {
+ K key = e.getKey();
+
+ c.apply(key, e.getValue().get(key));
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends K, ? extends V> entry) {
+ delegate.write(entry);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) {
+ delegate.writeAll(entries);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ delegate.delete(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
+ delegate.deleteAll(keys);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void txEnd(boolean commit) {
+ delegate.txEnd(commit);
+ }
+
+ /**
+ *
+ */
+ private class LoadFuture extends GridFutureAdapter<Map<K, V>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Collection of keys for pending cleanup. */
+ private volatile Collection<K> keys;
+
+ /**
+ *
+ */
+ public LoadFuture() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Map<K, V> res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ assert keys != null;
+
+ for (K key : keys)
+ pendingLoads.remove(key, this);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param key Key.
+ * @param val Loaded value.
+ */
+ public void onComplete(K key, V val) {
+ onComplete(Collections.singletonList(key), F.asMap(key, val));
+ }
+
+ /**
+ * @param keys Keys.
+ * @param res Loaded values.
+ */
+ public void onComplete(Collection<K> keys, Map<K, V> res) {
+ this.keys = keys;
+
+ onDone(res);
+ }
+
+ /**
+ * @param key Key.
+ * @param err Error.
+ */
+ public void onError(K key, Throwable err) {
+
+ }
+
+ /**
+ * @param keys Keys.
+ * @param err Error.
+ */
+ public void onError(Collection<K> keys, Throwable err) {
+ this.keys = keys;
+
+ onDone(err);
+ }
+
+ /**
+ * Gets value loaded for key k.
+ *
+ * @param key Key to load.
+ * @return Loaded value (possibly {@code null}).
+ * @throws IgniteCheckedException If load failed.
+ */
+ public V get(K key) throws IgniteCheckedException {
+ return get().get(key);
+ }
+ }
+}
|