#ignite-565: Revert GridCacheReduceQueryMultithreadedSelfTest. Change signature Ignite.clearAll(Set<?
extends K> keys);
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1ab6d387
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1ab6d387
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1ab6d387
Branch: refs/heads/ignite-443
Commit: 1ab6d38727b67a688ecca5bab4a59a8fa089336d
Parents: 2b788cc
Author: ivasilinets <ivasilinets@gridgain.com>
Authored: Thu Mar 26 15:16:28 2015 +0300
Committer: ivasilinets <ivasilinets@gridgain.com>
Committed: Thu Mar 26 15:16:28 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteCache.java | 2 +-
.../processors/cache/CacheProjection.java | 6 +-
.../processors/cache/GridCacheAdapter.java | 10 +-
.../cache/GridCacheProjectionImpl.java | 6 +-
.../processors/cache/GridCacheProxyImpl.java | 6 +-
.../processors/cache/IgniteCacheProxy.java | 2 +-
...idCacheReduceQueryMultithreadedSelfTest.java | 155 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 1 +
8 files changed, 172 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 4873fcb..5c5bb25 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -378,7 +378,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K,
V>, IgniteAsyncS
* @throws CacheException if there is a problem during the clear
*/
@IgniteAsyncSupported
- public void clearAll(Set<K> keys);
+ public void clearAll(Set<? extends K> keys);
/**
* Clear entry from the cache and swap storage, without notifying listeners or
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java
index 6659735..558f8af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheProjection.java
@@ -1268,7 +1268,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K,
V>> {
*
* @param keys Keys to clearLocally.
*/
- public void clearLocallyAll(Set<K> keys);
+ public void clearLocallyAll(Set<? extends K> keys);
/**
* Clears key on all nodes that store it's data. That is, caches are cleared on remote
@@ -1294,7 +1294,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K,
V>> {
* @param keys Keys to clear.
* @throws IgniteCheckedException In case of cache could not be cleared on any of the
nodes.
*/
- public void clearAll(Set<K> keys) throws IgniteCheckedException;
+ public void clearAll(Set<? extends K> keys) throws IgniteCheckedException;
/**
* Clears cache on all nodes that store it's data. That is, caches are cleared on remote
@@ -1326,7 +1326,7 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K,
V>> {
* @param keys Keys to clear.
* @return Clear future.
*/
- public IgniteInternalFuture<?> clearAsync(Set<K> keys);
+ public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys);
/**
* Clears cache on all nodes that store it's data. That is, caches are cleared on remote
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index cd0aec0..1973330 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1350,7 +1350,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K,
V>,
}
/** {@inheritDoc} */
- @Override public void clearLocallyAll(Set<K> keys) {
+ @Override public void clearLocallyAll(Set<? extends K> keys) {
clearLocally0(keys);
}
@@ -1458,7 +1458,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K,
V>,
}
/** {@inheritDoc} */
- @Override public void clearAll(Set<K> keys) throws IgniteCheckedException {
+ @Override public void clearAll(Set<? extends K> keys) throws IgniteCheckedException
{
// Clear local cache synchronously.
clearLocallyAll(keys);
@@ -1471,7 +1471,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K,
V>,
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> clearAsync(Set<K> keys) {
+ @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys)
{
return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), keys));
}
@@ -5640,7 +5640,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K,
V>,
private static final long serialVersionUID = 0L;
/** Keys to remove. */
- private Set<K> keys;
+ private Set<? extends K> keys;
/**
* Empty constructor for serialization.
@@ -5653,7 +5653,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K,
V>,
* @param cacheName Cache name.
* @param keys Keys to clear.
*/
- private GlobalClearKeySetCallable(String cacheName, Set<K> keys) {
+ private GlobalClearKeySetCallable(String cacheName, Set<? extends K> keys)
{
super(cacheName);
this.keys = keys;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
index 5c05f30..4ab187d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
@@ -875,7 +875,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K,
V
}
/** {@inheritDoc} */
- @Override public void clearLocallyAll(Set<K> keys) {
+ @Override public void clearLocallyAll(Set<? extends K> keys) {
cache.clearLocallyAll(keys);
}
@@ -890,7 +890,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K,
V
}
/** {@inheritDoc} */
- @Override public void clearAll(Set<K> keys) throws IgniteCheckedException {
+ @Override public void clearAll(Set<? extends K> keys) throws IgniteCheckedException
{
cache.clearAll(keys);
}
@@ -900,7 +900,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K,
V
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> clearAsync(Set<K> keys) {
+ @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys)
{
return cache.clearAsync(keys);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 772a3b7..1c25ef0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -1288,7 +1288,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K,
V>, Externali
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> clearAsync(Set<K> keys) {
+ @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys)
{
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1324,7 +1324,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K,
V>, Externali
}
/** {@inheritDoc} */
- @Override public void clearLocallyAll(Set<K> keys) {
+ @Override public void clearLocallyAll(Set<? extends K> keys) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
@@ -1348,7 +1348,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K,
V>, Externali
}
/** {@inheritDoc} */
- @Override public void clearAll(Set<K> keys) throws IgniteCheckedException {
+ @Override public void clearAll(Set<? extends K> keys) throws IgniteCheckedException
{
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 22bfe39..2e1deaa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -1024,7 +1024,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
}
/** {@inheritDoc} */
- @Override public void clearAll(Set<K> keys) {
+ @Override public void clearAll(Set<? extends K> keys) {
GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
new file mode 100644
index 0000000..2dfa542
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.marshaller.optimized.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ * Multithreaded reduce query tests with lots of data.
+ */
+public class GridCacheReduceQueryMultithreadedSelfTest extends GridCacheAbstractSelfTest
{
+ /** */
+ private static final int GRID_CNT = 5;
+
+ /** */
+ private static final int TEST_TIMEOUT = 2 * 60 * 1000;
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return GRID_CNT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return TEST_TIMEOUT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+ IgniteConfiguration c = super.getConfiguration(gridName);
+
+ c.setMarshaller(new OptimizedMarshaller(false));
+
+ return c;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception
{
+ CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+ cfg.setCacheMode(PARTITIONED);
+ cfg.setBackups(1);
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ cfg.setIndexedTypes(
+ String.class, Integer.class
+ );
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testReduceQuery() throws Exception {
+ final int keyCnt = 5000;
+ final int logFreq = 500;
+
+ final GridCacheAdapter<String, Integer> c = internalCache(jcache());
+
+ final CountDownLatch startLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable() {
+ @Override public Object call() throws Exception {
+ for (int i = 1; i < keyCnt; i++) {
+ c.put(String.valueOf(i), i);
+
+ startLatch.countDown();
+
+ if (i % logFreq == 0)
+ info("Stored entries: " + i);
+ }
+
+ return null;
+ }
+ }, 1);
+
+ // Create query.
+ final CacheQuery<Map.Entry<String, Integer>> sumQry =
+ c.queries().createSqlQuery(Integer.class, "_val > 0").timeout(TEST_TIMEOUT);
+
+ final R1<Map.Entry<String, Integer>, Integer> rmtRdc = new R1<Map.Entry<String,
Integer>, Integer>() {
+ /** */
+ private AtomicInteger sum = new AtomicInteger();
+
+ @Override public boolean collect(Map.Entry<String, Integer> e) {
+ sum.addAndGet(e.getValue());
+
+ return true;
+ }
+
+ @Override public Integer reduce() {
+ return sum.get();
+ }
+ };
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ startLatch.await();
+
+ IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable() {
+ @Override public Object call() throws Exception {
+ int cnt = 0;
+
+ while (!stop.get()) {
+ Collection<Integer> res = sumQry.execute(rmtRdc).get();
+
+ int sum = F.sumInt(res);
+
+ cnt++;
+
+ assertTrue(sum > 0);
+
+ if (cnt % logFreq == 0) {
+ info("Reduced value: " + sum);
+ info("Executed queries: " + cnt);
+ }
+ }
+
+ return null;
+ }
+ }, 1);
+
+ fut1.get();
+
+ stop.set(true);
+
+ fut2.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ab6d387/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index dc9a480..e29af9f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -62,6 +62,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.class);
suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class);
// suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); TODO IGNITE-484
+ suite.addTestSuite(GridCacheReduceQueryMultithreadedSelfTest.class);
suite.addTestSuite(GridCacheCrossCacheQuerySelfTest.class);
// Fields queries.
|