ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [39/39] incubator-ignite git commit: Merge branch 'sprint-1' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-sql-tests
Date Thu, 05 Feb 2015 21:40:50 GMT
Merge branch 'sprint-1' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-sql-tests

Conflicts:
	modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFieldsQuerySelfTest.java
	modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryLoadSelfTest.java
	modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/97a9adc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/97a9adc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/97a9adc8

Branch: refs/heads/ignite-sql-tests
Commit: 97a9adc8563ce0a17a03feb6c066b4524d84e514
Parents: 5542fea ce0c304
Author: S.Vladykin <svladykin@gridgain.com>
Authored: Fri Feb 6 00:33:26 2015 +0300
Committer: S.Vladykin <svladykin@gridgain.com>
Committed: Fri Feb 6 00:33:26 2015 +0300

----------------------------------------------------------------------
 .../datagrid/CacheContinuousQueryExample.java   |   2 +-
 .../examples/ScalarCacheAffinityExample1.scala  |   2 +-
 .../examples/ScalarCacheAffinityExample2.scala  |   2 +-
 .../ScalarCacheAffinitySimpleExample.scala      |   2 +-
 .../scalar/examples/ScalarCacheExample.scala    |   2 +-
 .../ScalarCachePopularNumbersExample.scala      |   4 +-
 .../examples/ScalarCacheQueryExample.scala      |   2 +-
 .../examples/ScalarSnowflakeSchemaExample.scala |   4 +-
 .../ClientAbstractMultiThreadedSelfTest.java    |   2 +-
 .../integration/ClientAbstractSelfTest.java     |   4 +-
 .../rest/AbstractRestProcessorSelfTest.java     |   2 +-
 .../rest/RestBinaryProtocolSelfTest.java        |   6 +-
 .../rest/RestMemcacheProtocolSelfTest.java      |   6 +-
 .../loadtests/client/ClientTcpSslLoadTest.java  |   7 +-
 .../org/apache/ignite/cache/CacheEntry.java     |   4 +-
 .../apache/ignite/cache/CacheProjection.java    |  45 +++--
 .../discovery/GridDiscoveryManager.java         |  22 +--
 .../affinity/GridAffinityAssignmentCache.java   |  23 ++-
 .../processors/cache/GridCacheAdapter.java      |  70 +++----
 .../cache/GridCacheClearAllRunnable.java        |  12 +-
 .../cache/GridCacheConcurrentMap.java           |   2 +-
 .../cache/GridCacheDeploymentManager.java       |   4 +-
 .../processors/cache/GridCacheEntryImpl.java    |   2 +-
 .../processors/cache/GridCacheEntrySet.java     |   2 +-
 .../processors/cache/GridCacheKeySet.java       |   2 +-
 .../processors/cache/GridCacheMapAdapter.java   |   2 +-
 .../processors/cache/GridCacheMapEntry.java     |  10 +-
 .../cache/GridCacheProjectionImpl.java          |  25 ++-
 .../processors/cache/GridCacheProxyImpl.java    |  32 +++-
 .../processors/cache/GridCacheUtils.java        |   2 +-
 .../cache/GridCacheValueCollection.java         |   2 +-
 .../processors/cache/IgniteCacheProxy.java      |  42 +----
 .../GridDistributedCacheAdapter.java            | 134 ++++++++++++++
 .../distributed/dht/GridDhtCacheAdapter.java    |   4 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   5 -
 .../distributed/near/GridNearAtomicCache.java   |   4 +-
 .../distributed/near/GridNearCacheAdapter.java  |  16 +-
 .../near/GridNearCacheClearAllRunnable.java     |   2 +-
 .../processors/cache/local/GridLocalCache.java  |   6 +
 .../local/atomic/GridLocalAtomicCache.java      |   4 +-
 .../GridCacheContinuousQueryAdapter.java        |  45 +++--
 .../GridCacheContinuousQueryHandler.java        |  15 +-
 .../continuous/GridContinuousProcessor.java     |  15 +-
 .../dataload/GridDataLoadCacheUpdaters.java     |  12 +-
 .../portable/GridPortableInputStream.java       |   7 +
 .../processors/rest/GridRestProcessor.java      |   2 +
 .../visor/cache/VisorCacheClearTask.java        |   2 +-
 ...cheAbstractFullApiMultithreadedSelfTest.java |  22 ---
 .../cache/GridCacheAbstractFullApiSelfTest.java | 182 ++++++-------------
 .../GridCacheAbstractProjectionSelfTest.java    |  31 +---
 .../cache/GridCacheAbstractSelfTest.java        |  12 +-
 .../cache/GridCacheAbstractTxReadTest.java      |   2 +-
 .../cache/GridCacheBasicStoreAbstractTest.java  |   8 +-
 ...acheBasicStoreMultithreadedAbstractTest.java |   2 +-
 .../cache/GridCacheClearAllSelfTest.java        |  32 ++--
 ...idCacheGetAndTransformStoreAbstractTest.java |   4 +-
 .../cache/GridCacheGlobalClearAllSelfTest.java  |   6 +-
 .../GridCacheWriteBehindStoreAbstractTest.java  |   6 +-
 .../GridCacheBasicOpAbstractTest.java           |   2 +-
 .../distributed/GridCacheLockAbstractTest.java  |  21 +++
 .../GridCacheMultiNodeAbstractTest.java         |   2 +-
 .../GridCacheMultiNodeLockAbstractTest.java     |   2 +-
 ...heAbstractTransformWriteThroughSelfTest.java |   2 +-
 .../dht/GridCacheDhtEntrySelfTest.java          |   4 +-
 ...GridCacheDhtEvictionNearReadersSelfTest.java |   2 +-
 .../dht/GridCacheDhtEvictionSelfTest.java       |   2 +-
 ...tomicClientOnlyMultiNodeFullApiSelfTest.java |  16 +-
 ...eAtomicNearOnlyMultiNodeFullApiSelfTest.java |   8 +-
 ...idCacheNearOnlyMultiNodeFullApiSelfTest.java |  20 +-
 .../GridCacheNearPartitionedClearSelfTest.java  |   4 +-
 ...ePartitionedMultiThreadedPutGetSelfTest.java |   5 +-
 ...hePartitionedQueryMultiThreadedSelfTest.java |   2 +-
 ...dCacheContinuousQueryReplicatedSelfTest.java |  65 +++++++
 .../fs/GridGgfsDataManagerSelfTest.java         |   4 +-
 .../loadtests/cache/GridCacheBenchmark.java     |   2 +-
 .../loadtests/cache/GridCacheLoadTest.java      |   2 +-
 .../GridCacheWriteBehindStoreLoadTest.java      |   2 +-
 .../hadoop/jobtracker/GridHadoopJobTracker.java |   2 +-
 .../GridHibernateAccessStrategyAdapter.java     |   4 +-
 ...idHibernateL2CacheConfigurationSelfTest.java |   2 +-
 .../hibernate/GridHibernateL2CacheSelfTest.java |   2 +-
 .../cache/GridCacheOffHeapAndSwapSelfTest.java  |   2 +-
 .../cache/IgniteCacheQueryLoadSelfTest.java     |   3 +-
 .../IgniteCacheQueryMultiThreadedSelfTest.java  |  12 +-
 .../IgniteCacheReplicatedQuerySelfTest.java     |   2 +-
 .../cache/VisorCacheCompactCommandSpec.scala    |   4 +-
 86 files changed, 640 insertions(+), 490 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/97a9adc8/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/97a9adc8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/97a9adc8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/97a9adc8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/97a9adc8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryMultiThreadedSelfTest.java
index 74f5773,0000000..32864a8
mode 100644,000000..100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryMultiThreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryMultiThreadedSelfTest.java
@@@ -1,306 -1,0 +1,306 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.cache.distributed.near;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.cache.*;
 +import org.apache.ignite.cache.query.*;
 +import org.apache.ignite.cache.query.annotations.*;
 +import org.apache.ignite.configuration.*;
 +import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.util.tostring.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +import org.apache.ignite.spi.discovery.tcp.*;
 +import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 +import org.apache.ignite.testframework.*;
 +import org.apache.ignite.testframework.junits.common.*;
 +
 +import javax.cache.*;
 +import java.io.*;
 +import java.util.*;
 +import java.util.concurrent.atomic.*;
 +
 +import static org.apache.ignite.cache.CacheAtomicityMode.*;
 +import static org.apache.ignite.cache.CacheDistributionMode.*;
 +import static org.apache.ignite.cache.CacheMode.*;
 +
 +/**
 + * Tests for partitioned cache queries.
 + */
 +public class IgniteCachePartitionedQueryMultiThreadedSelfTest extends GridCommonAbstractTest {
 +    /** */
 +    private static final boolean TEST_INFO = true;
 +
 +    /** Number of test grids (nodes). Should not be less than 2. */
 +    private static final int GRID_CNT = 3;
 +
 +    /** */
 +    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 +
 +    /** Don't start grid by default. */
 +    public IgniteCachePartitionedQueryMultiThreadedSelfTest() {
 +        super(false);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
 +        IgniteConfiguration c = super.getConfiguration(gridName);
 +
 +        TcpDiscoverySpi disco = new TcpDiscoverySpi();
 +
 +        disco.setIpFinder(ipFinder);
 +
 +        c.setDiscoverySpi(disco);
 +
 +        CacheConfiguration cc = defaultCacheConfiguration();
 +
 +        cc.setCacheMode(PARTITIONED);
 +
 +        // Query should be executed without ongoing transactions.
 +        cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
 +        cc.setBackups(0);
 +        cc.setPreloadMode(CachePreloadMode.SYNC);
 +        cc.setAtomicityMode(TRANSACTIONAL);
 +        cc.setDistributionMode(NEAR_PARTITIONED);
 +
 +        c.setCacheConfiguration(cc);
 +
 +        return c;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void beforeTestsStarted() throws Exception {
 +        assert GRID_CNT >= 2 : "Constant GRID_CNT must be greater than or equal to 2.";
 +
 +        startGridsMultiThreaded(GRID_CNT);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void afterTestsStopped() throws Exception {
 +        stopAllGrids();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void afterTest() throws Exception {
 +        super.afterTest();
 +
 +        // Clean up all caches.
 +        for (int i = 0; i < GRID_CNT; i++)
-             grid(i).cache(null).removeAll(F.<CacheEntry<Object, Object>>alwaysTrue());
++            grid(i).cache(null).removeAll();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void info(String msg) {
 +        if (TEST_INFO)
 +            super.info(msg);
 +    }
 +
 +    /**
 +     * JUnit.
 +     *
 +     * @throws Exception If failed.
 +     */
 +    @SuppressWarnings({"TooBroadScope"})
 +    public void testLuceneAndSqlMultithreaded() throws Exception {
 +        // ---------- Test parameters ---------- //
 +        int luceneThreads = 10;
 +        int sqlThreads = 10;
 +        long duration = 10 * 1000;
 +        final int logMod = 100;
 +
 +        final Person p1 = new Person("Jon", 1500, "Master");
 +        final Person p2 = new Person("Jane", 2000, "Master");
 +        final Person p3 = new Person("Mike", 1800, "Bachelor");
 +        final Person p4 = new Person("Bob", 1900, "Bachelor");
 +
 +        final IgniteCache<UUID, Person> cache0 = grid(0).jcache(null);
 +
 +        cache0.put(p1.id(), p1);
 +        cache0.put(p2.id(), p2);
 +        cache0.put(p3.id(), p3);
 +        cache0.put(p4.id(), p4);
 +
 +        assertEquals(4, cache0.size());
 +
 +        assert grid(0).nodes().size() == GRID_CNT;
 +
 +        final AtomicBoolean done = new AtomicBoolean();
 +
 +        final AtomicLong luceneCnt = new AtomicLong();
 +
 +        // Start lucene query threads.
 +        IgniteInternalFuture<?> futLucene = GridTestUtils.runMultiThreadedAsync(new CAX() {
 +            @Override public void applyx() throws IgniteCheckedException {
 +                while (!done.get()) {
 +                    QueryCursor<Cache.Entry<UUID, Person>> master =
 +                        cache0.query(new QueryTextPredicate(Person.class, "Master"));
 +
 +                    Collection<Cache.Entry<UUID, Person>> entries = master.getAll();
 +
 +                    checkResult(entries, p1, p2);
 +
 +                    long cnt = luceneCnt.incrementAndGet();
 +
 +                    if (cnt % logMod == 0)
 +                        info("Executed LUCENE queries: " + cnt);
 +                }
 +            }
 +        }, luceneThreads, "LUCENE-THREAD");
 +
 +        final AtomicLong sqlCnt = new AtomicLong();
 +
 +        // Start sql query threads.
 +        IgniteInternalFuture<?> futSql = GridTestUtils.runMultiThreadedAsync(new CAX() {
 +            @Override public void applyx() throws IgniteCheckedException {
 +                while (!done.get()) {
 +                    QueryCursor<Cache.Entry<UUID, Person>> bachelors =
 +                            cache0.query(new QuerySqlPredicate(Person.class, "degree = 'Bachelor'"));
 +
 +                    Collection<Cache.Entry<UUID, Person>> entries = bachelors.getAll();
 +
 +                    checkResult(entries, p3, p4);
 +
 +                    long cnt = sqlCnt.incrementAndGet();
 +
 +                    if (cnt % logMod == 0)
 +                        info("Executed SQL queries: " + cnt);
 +                }
 +            }
 +        }, sqlThreads, "SQL-THREAD");
 +
 +        Thread.sleep(duration);
 +
 +        done.set(true);
 +
 +        futLucene.get();
 +        futSql.get();
 +    }
 +
 +    /**
 +     * @param entries Queried result.
 +     * @param persons Persons that should be in the result.
 +     */
 +    private void checkResult(Iterable<Cache.Entry<UUID, Person>> entries, Person... persons) {
 +        for (Cache.Entry<UUID, Person> entry : entries) {
 +            assertEquals(entry.getKey(), entry.getValue().id());
 +
 +            assert F.<Person>asList(persons).contains(entry.getValue());
 +        }
 +    }
 +
 +    /** Test class. */
 +    private static class Person implements Externalizable {
 +        /** */
 +        @GridToStringExclude
 +        private UUID id = UUID.randomUUID();
 +
 +        /** */
 +        @QuerySqlField
 +        private String name;
 +
 +        /** */
 +        @QuerySqlField
 +        private int salary;
 +
 +        /** */
 +        @QuerySqlField
 +        @QueryTextField
 +        private String degree;
 +
 +        /** Required by {@link Externalizable}. */
 +        public Person() {
 +            // No-op.
 +        }
 +
 +        /**
 +         * @param name Name.
 +         * @param salary Salary.
 +         * @param degree Degree.
 +         */
 +        Person(String name, int salary, String degree) {
 +            assert name != null;
 +            assert salary > 0;
 +            assert degree != null;
 +
 +            this.name = name;
 +            this.salary = salary;
 +            this.degree = degree;
 +        }
 +
 +        /** @return Id. */
 +        UUID id() {
 +            return id;
 +        }
 +
 +        /** @return Name. */
 +        String name() {
 +            return name;
 +        }
 +
 +        /** @return Salary. */
 +        double salary() {
 +            return salary;
 +        }
 +
 +        /** @return Degree. */
 +        String degree() {
 +            return degree;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void writeExternal(ObjectOutput out) throws IOException {
 +            U.writeUuid(out, id);
 +            U.writeString(out, name);
 +            out.writeInt(salary);
 +            U.writeString(out, degree);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
 +            id = U.readUuid(in);
 +            name = U.readString(in);
 +            salary = in.readInt();
 +            degree = U.readString(in);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public int hashCode() {
 +            return id.hashCode() + 31 * name.hashCode() + 31 * 31 * salary;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public boolean equals(Object obj) {
 +            if (obj == this)
 +                return true;
 +
 +            if (!(obj instanceof Person))
 +                return false;
 +
 +            Person that = (Person)obj;
 +
 +            return that.id.equals(id) && that.name.equals(name) && that.salary == salary && that.degree.equals(degree);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public String toString() {
 +            return S.toString(Person.class, this);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/97a9adc8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapAndSwapSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/97a9adc8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryLoadSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryLoadSelfTest.java
index b0e286b,0000000..623862b
mode 100644,000000..100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryLoadSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryLoadSelfTest.java
@@@ -1,321 -1,0 +1,320 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.cache;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.cache.*;
 +import org.apache.ignite.cache.query.annotations.*;
 +import org.apache.ignite.cache.query.*;
 +import org.apache.ignite.cache.store.*;
 +import org.apache.ignite.configuration.*;
 +import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.processors.cache.query.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +import org.apache.ignite.lang.*;
 +import org.apache.ignite.spi.discovery.tcp.*;
 +import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 +import org.apache.ignite.testframework.junits.common.*;
 +import org.jetbrains.annotations.*;
 +
 +import javax.cache.*;
 +import javax.cache.configuration.*;
 +import java.util.*;
 +
 +import static org.apache.ignite.cache.CacheMode.*;
 +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
 +
 +/**
 + * Test that entries are indexed on load/reload methods.
 + */
 +public class IgniteCacheQueryLoadSelfTest extends GridCommonAbstractTest {
 +    /** IP finder. */
 +    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 +
 +    /** Puts count. */
 +    private static final int PUT_CNT = 10;
 +
 +    /** Store map. */
 +    private static final Map<Integer, ValueObject> STORE_MAP = new HashMap<>();
 +
 +    /** */
 +    public IgniteCacheQueryLoadSelfTest() {
 +        super(true);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("unchecked")
 +    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
 +        IgniteConfiguration cfg = super.getConfiguration(gridName);
 +
 +        CacheConfiguration ccfg = defaultCacheConfiguration();
 +
 +        ccfg.setCacheMode(REPLICATED);
 +        ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore()));
 +        ccfg.setReadThrough(true);
 +        ccfg.setWriteThrough(true);
 +        ccfg.setLoadPreviousValue(true);
 +        ccfg.setWriteSynchronizationMode(FULL_SYNC);
 +
 +        cfg.setCacheConfiguration(ccfg);
 +
 +        TcpDiscoverySpi disco = new TcpDiscoverySpi();
 +
 +        disco.setIpFinder(IP_FINDER);
 +
 +        cfg.setDiscoverySpi(disco);
 +
 +        return cfg;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void afterTest() throws Exception {
 +        cache().removeAll();
 +
 +        assert cache().isEmpty();
 +        assert size(ValueObject.class) == 0;
 +
 +        STORE_MAP.clear();
 +    }
 +
 +    /**
 +     * Number of objects of given type in index.
 +     *
 +     * @param cls Value type.
 +     * @return Objects number.
 +     * @throws IgniteCheckedException If failed.
 +     */
 +    private long size(Class<?> cls) throws IgniteCheckedException {
 +        GridCacheQueryManager<Object, Object> qryMgr = ((IgniteKernal)grid()).internalCache().context().queries();
 +
 +        assert qryMgr != null;
 +
 +        return qryMgr.size(cls);
 +    }
 +
 +    /**
 +     * @throws Exception If failed.
 +     */
 +    public void testLoadCache() throws Exception {
 +        IgniteCache<Integer, ValueObject> cache = grid().jcache(null);
 +
 +        cache.loadCache(null, 0);
 +
 +        assert cache.size() == PUT_CNT;
 +
 +        Collection<Cache.Entry<Integer, ValueObject>> res =
 +            cache.query(new QuerySqlPredicate(ValueObject.class, "val >= 0")).getAll();
 +
 +        assertNotNull(res);
 +        assertEquals(PUT_CNT, res.size());
 +        assertEquals(PUT_CNT, size(ValueObject.class));
 +    }
 +
 +    /**
 +     * @throws Exception If failed.
 +     */
 +    public void testLoadCacheAsync() throws Exception {
 +        IgniteCache<Integer, ValueObject> cache = grid(0).jcache(null);
 +
 +        cache.withAsync().loadCache(null, 0);
 +
 +        cache.future().get();
 +
 +        assert cache.size() == PUT_CNT;
 +
 +        Collection<Cache.Entry<Integer, ValueObject>> res =
 +            cache.query(new QuerySqlPredicate(ValueObject.class, "val >= 0")).getAll();
 +
 +        assert res != null;
 +        assert res.size() == PUT_CNT;
 +        assert size(ValueObject.class) == PUT_CNT;
 +    }
 +
 +    /**
 +     * @throws Exception If failed.
 +     */
 +    public void testLoadCacheFiltered() throws Exception {
 +        IgniteCache<Integer, ValueObject> cache = grid().jcache(null);
 +
 +        cache.loadCache(new P2<Integer, ValueObject>() {
 +            @Override public boolean apply(Integer key, ValueObject val) {
 +                return key >= 5;
 +            }
 +        }, 0);
 +
 +        assert cache.size() == PUT_CNT - 5;
 +
 +        Collection<Cache.Entry<Integer, ValueObject>> res =
 +            cache.query(new QuerySqlPredicate(ValueObject.class, "val >= 0")).getAll();
 +
 +        assert res != null;
 +        assert res.size() == PUT_CNT - 5;
 +        assert size(ValueObject.class) == PUT_CNT - 5;
 +    }
 +
 +    /**
 +     * @throws Exception If failed.
 +     */
 +    public void testLoadCacheAsyncFiltered() throws Exception {
 +        IgniteCache<Integer, ValueObject> cache = grid().jcache(null);
 +
 +        cache.withAsync().loadCache(new P2<Integer, ValueObject>() {
 +            @Override public boolean apply(Integer key, ValueObject val) {
 +                return key >= 5;
 +            }
 +        }, 0);
 +
 +        cache.future().get();
 +
 +        assert cache.size() == PUT_CNT - 5;
 +
 +        Collection<Cache.Entry<Integer, ValueObject>> res =
 +            cache.query(new QuerySqlPredicate(ValueObject.class, "val >= 0")).getAll();
 +
 +        assert res != null;
 +        assert res.size() == PUT_CNT - 5;
 +        assert size(ValueObject.class) == PUT_CNT - 5;
 +    }
 +
 +    /**
 +     * @throws Exception If failed.
 +     */
 +    public void testReloadAsync() throws Exception {
 +        STORE_MAP.put(1, new ValueObject(1));
 +
 +        GridCache<Integer, ValueObject> cache = cache();
 +
 +        assert cache.reloadAsync(1).get().value() == 1;
 +
 +        assert cache.size() == 1;
 +
 +        Collection<Map.Entry<Integer, ValueObject>> res =
 +            cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get();
 +
 +        assert res != null;
 +        assert res.size() == 1;
 +        assert size(ValueObject.class) == 1;
 +    }
 +
 +    /**
 +     * @throws Exception If failed.
 +     */
 +    public void testReloadAll() throws Exception {
 +        for (int i = 0; i < PUT_CNT; i++)
 +            STORE_MAP.put(i, new ValueObject(i));
 +
 +        GridCache<Integer, ValueObject> cache = cache();
 +
 +        Integer[] keys = new Integer[PUT_CNT - 5];
 +
 +        for (int i = 0; i < PUT_CNT - 5; i++)
 +            keys[i] = i + 5;
 +
 +        cache.reloadAll(F.asList(keys));
 +
 +        assert cache.size() == PUT_CNT - 5;
 +
 +        Collection<Map.Entry<Integer, ValueObject>> res =
 +            cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get();
 +
 +        assert res != null;
 +        assert res.size() == PUT_CNT - 5;
 +        assert size(ValueObject.class) == PUT_CNT - 5;
 +
-         for (Integer key : keys)
-             cache.clear(key);
++        cache.clear();
 +
 +        assert cache.isEmpty();
 +        assertEquals(0, cache.size());
 +
 +        cache.reloadAll(Arrays.asList(keys));
 +
 +        assertEquals(PUT_CNT - 5, cache.size());
 +
 +        res = cache.queries().createSqlQuery(ValueObject.class, "val >= 0").execute().get();
 +
 +        assert res != null;
 +        assert res.size() == PUT_CNT - 5;
 +        assert size(ValueObject.class) == PUT_CNT - 5;
 +    }
 +
 +    /**
 +     * Test store.
 +     */
 +    private static class TestStore extends CacheStoreAdapter<Integer, ValueObject> {
 +        /** {@inheritDoc} */
 +        @Override public void loadCache(IgniteBiInClosure<Integer, ValueObject> clo, @Nullable Object... args) {
 +            assert clo != null;
 +
 +            for (int i = 0; i < PUT_CNT; i++)
 +                clo.apply(i, new ValueObject(i));
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public ValueObject load(Integer key) {
 +            assert key != null;
 +
 +            return STORE_MAP.get(key);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void write(javax.cache.Cache.Entry<? extends Integer, ? extends ValueObject> e) {
 +            assert e != null;
 +            assert e.getKey() != null;
 +            assert e.getValue() != null;
 +
 +            STORE_MAP.put(e.getKey(), e.getValue());
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void delete(Object key) {
 +            assert key != null;
 +
 +            STORE_MAP.remove(key);
 +        }
 +    }
 +
 +    /**
 +     * Value object class.
 +     */
 +    private static class ValueObject {
 +        /** Value. */
 +        @QuerySqlField
 +        private final int val;
 +
 +        /**
 +         * @param val Value.
 +         */
 +        ValueObject(int val) {
 +            this.val = val;
 +        }
 +
 +        /**
 +         * @return Value.
 +         */
 +        int value() {
 +            return val;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public String toString() {
 +            return S.toString(ValueObject.class, this);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/97a9adc8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
index 44a547b,0000000..1ee45d1
mode 100644,000000..100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java
@@@ -1,752 -1,0 +1,762 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.cache;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.cache.*;
 +import org.apache.ignite.cache.eviction.lru.*;
 +import org.apache.ignite.cache.query.*;
 +import org.apache.ignite.cache.query.annotations.*;
 +import org.apache.ignite.configuration.*;
 +import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.processors.cache.query.*;
 +import org.apache.ignite.internal.processors.query.*;
 +import org.apache.ignite.internal.processors.query.h2.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.marshaller.optimized.*;
 +import org.apache.ignite.spi.discovery.tcp.*;
 +import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 +import org.apache.ignite.spi.swapspace.file.*;
 +import org.apache.ignite.testframework.junits.common.*;
 +import org.jetbrains.annotations.*;
 +
 +import javax.cache.*;
 +import java.io.*;
 +import java.util.*;
 +import java.util.concurrent.atomic.*;
 +
 +import static org.apache.ignite.cache.CacheAtomicityMode.*;
 +import static org.apache.ignite.cache.CacheMode.*;
 +
 +/**
 + * Multi-threaded tests for cache queries.
 + */
 +@SuppressWarnings("StatementWithEmptyBody")
 +public class IgniteCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTest {
 +    /** */
 +    private static final boolean TEST_INFO = true;
 +
 +    /** Number of test grids (nodes). Should not be less than 2. */
 +    private static final int GRID_CNT = 2;
 +
 +    /** */
 +    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 +
 +    /** */
 +    private static AtomicInteger idxSwapCnt = new AtomicInteger();
 +
 +    /** */
 +    private static AtomicInteger idxUnswapCnt = new AtomicInteger();
 +
 +    /** */
 +    private static final long DURATION = 30 * 1000;
 +
 +    /** Don't start grid by default. */
 +    public IgniteCacheQueryMultiThreadedSelfTest() {
 +        super(false);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
 +        IgniteConfiguration cfg = super.getConfiguration(gridName);
 +
 +        TcpDiscoverySpi disco = new TcpDiscoverySpi();
 +
 +        disco.setIpFinder(ipFinder);
 +
 +        cfg.setDiscoverySpi(disco);
 +
 +        cfg.setSwapSpaceSpi(new FileSwapSpaceSpi());
 +        cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
 +
 +        CacheConfiguration cacheCfg = defaultCacheConfiguration();
 +
 +        cacheCfg.setCacheMode(PARTITIONED);
 +        cacheCfg.setAtomicityMode(TRANSACTIONAL);
 +        cacheCfg.setDistributionMode(CacheDistributionMode.NEAR_PARTITIONED);
 +        cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
 +        cacheCfg.setSwapEnabled(true);
 +        cacheCfg.setBackups(1);
 +        cacheCfg.setEvictionPolicy(evictsEnabled() ? new CacheLruEvictionPolicy(100) : null);
 +
 +        CacheQueryConfiguration qcfg = new CacheQueryConfiguration();
 +
 +        qcfg.setIndexPrimitiveKey(true);
 +
 +        cacheCfg.setQueryConfiguration(qcfg);
 +
 +        if (offheapEnabled() && evictsEnabled())
 +            cacheCfg.setOffHeapMaxMemory(1000); // Small offheap for evictions.
 +
 +        cfg.setCacheConfiguration(cacheCfg);
 +
 +        IgniteQueryConfiguration indexing = new IgniteQueryConfiguration();
 +
 +        indexing.setMaxOffheapRowsCacheSize(128);
 +
 +        if (offheapEnabled())
 +            indexing.setMaxOffHeapMemory(0);
 +
 +        cfg.setQueryConfiguration(indexing);
 +
 +        GridQueryProcessor.idxCls = FakeIndexing.class;
 +
 +        return cfg;
 +    }
 +
 +    /**
 +     *
 +     */
 +    private static class FakeIndexing extends IgniteH2Indexing {
 +        @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteCheckedException {
 +            super.onSwap(spaceName, key);
 +
 +            idxSwapCnt.incrementAndGet();
 +        }
 +
 +        @Override public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes)
 +        throws IgniteCheckedException {
 +            super.onUnswap(spaceName, key, val, valBytes);
 +
 +            idxUnswapCnt.incrementAndGet();
 +        }
 +    }
 +
 +    /** @return {@code true} If offheap enabled. */
 +    protected boolean offheapEnabled() {
 +        return false;
 +    }
 +
 +    /** @return {@code true} If evictions enabled. */
 +    protected boolean evictsEnabled() {
 +        return true;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void beforeTest() throws Exception {
 +        super.beforeTest();
 +
 +        // Clean up all caches.
 +        for (int i = 0; i < GRID_CNT; i++) {
 +            GridCache<Object, Object> c = grid(i).cache(null);
 +
 +            assertEquals(0, c.size());
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void beforeTestsStarted() throws Exception {
 +        assert GRID_CNT >= 2 : "Constant GRID_CNT must be greater than or equal to 2.";
 +
 +        startGridsMultiThreaded(GRID_CNT);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void afterTestsStopped() throws Exception {
 +        stopAllGrids();
 +
 +        if (evictsEnabled()) {
 +            assertTrue(idxSwapCnt.get() > 0);
 +            assertTrue(idxUnswapCnt.get() > 0);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void afterTest() throws Exception {
 +        super.afterTest();
 +
 +        // Clean up all caches.
 +        for (int i = 0; i < GRID_CNT; i++) {
 +            GridCache<Object, Object> c = grid(i).cache(null);
 +
-             c.removeAll(F.<CacheEntry<Object, Object>>alwaysTrue());
++            c.removeAll();
++
++            // Fix for tests where mapping was removed at primary node
++            // but was not removed at others.
++            // removeAll() removes mapping only when it presents at a primary node.
++            // To remove all mappings used force remove by key.
++            if (c.size() > 0) {
++                for (Object k : c.keySet()) {
++                    c.remove(k);
++                }
++            }
 +
 +            Iterator<Map.Entry<Object, Object>> it = c.swapIterator();
 +
 +            while (it.hasNext()) {
 +                it.next();
 +
 +                it.remove();
 +            }
 +
 +            it = c.offHeapIterator();
 +
 +            while (it.hasNext()) {
 +                it.next();
 +
 +                it.remove();
 +            }
 +
 +            assertEquals("Swap keys: " + c.swapKeys(), 0, c.swapKeys());
 +            assertEquals(0, c.offHeapEntriesCount());
 +            assertEquals(0, c.size());
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void info(String msg) {
 +        if (TEST_INFO)
 +            super.info(msg);
 +    }
 +
 +    /**
 +     * @param entries Entries.
 +     * @param g Grid.
 +     * @return Affinity nodes.
 +     */
 +    private Set<UUID> affinityNodes(Iterable<Cache.Entry<Integer, Integer>> entries, Ignite g) {
 +        Set<UUID> nodes = new HashSet<>();
 +
 +        for (Cache.Entry<Integer, Integer> entry : entries)
 +            nodes.add(g.cache(null).affinity().mapKeyToPrimaryAndBackups(entry.getKey()).iterator().next().id());
 +
 +        return nodes;
 +    }
 +
 +    /**
 +     * TODO
 +     *
 +     * JUnit.
 +     *
 +     * @throws Exception If failed.
 +     */
 +    @SuppressWarnings({"TooBroadScope"})
 +    public void _testMultiThreadedSwapUnswapString() throws Exception {
 +        int threadCnt = 150;
 +        final int keyCnt = 2000;
 +        final int valCnt = 10000;
 +
 +        final Ignite g = grid(0);
 +
 +        // Put test values into cache.
 +        final IgniteCache<Integer, String> c = g.jcache(null);
 +        final IgniteCache<Integer, Long> cl = g.jcache(null);
 +
 +        assertEquals(0, g.cache(null).size());
 +        assertEquals(0, c.query(new QuerySqlPredicate(String.class, "1 = 1")).getAll().size());
 +        assertEquals(0, cl.query(new QuerySqlPredicate(Long.class, "1 = 1")).getAll().size());
 +
 +        Random rnd = new Random();
 +
 +        for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) {
 +            c.put(i, String.valueOf(rnd.nextInt(valCnt)));
 +
 +            if (evictsEnabled() && rnd.nextBoolean())
 +                c.localEvict(Arrays.asList(i));
 +        }
 +
 +        final AtomicBoolean done = new AtomicBoolean();
 +
 +        IgniteInternalFuture<?> fut = multithreadedAsync(new CAX() {
 +            @Override public void applyx() throws IgniteCheckedException {
 +                Random rnd = new Random();
 +
 +                while (!done.get()) {
 +                    switch (rnd.nextInt(5)) {
 +                        case 0:
 +                            c.put(rnd.nextInt(keyCnt), String.valueOf(rnd.nextInt(valCnt)));
 +
 +                            break;
 +                        case 1:
 +                            if (evictsEnabled())
 +                                c.localEvict(Arrays.asList(rnd.nextInt(keyCnt)));
 +
 +                            break;
 +                        case 2:
 +                            c.remove(rnd.nextInt(keyCnt));
 +
 +                            break;
 +                        case 3:
 +                            c.get(rnd.nextInt(keyCnt));
 +
 +                            break;
 +                        case 4:
 +                            int from = rnd.nextInt(valCnt);
 +
 +                            QueryCursor<Cache.Entry<Integer, String>> qry = c.query(
 +                                    new QuerySqlPredicate(String.class, "_val between ? and ?", String.valueOf(from),
 +                                            String.valueOf(from + 250)));
 +
 +                            Collection<Cache.Entry<Integer, String>> res = qry.getAll();
 +
 +                            for (Cache.Entry<Integer, String> ignored : res) {
 +                                //No-op.
 +                            }
 +                    }
 +                }
 +            }
 +        }, threadCnt);
 +
 +        Thread.sleep(DURATION);
 +
 +        done.set(true);
 +
 +        fut.get();
 +    }
 +
 +    /**
 +     * TODO
 +     *
 +     * JUnit.
 +     *
 +     * @throws Exception If failed.
 +     */
 +    @SuppressWarnings({"TooBroadScope"})
 +    public void _testMultiThreadedSwapUnswapLong() throws Exception {
 +        int threadCnt = 150;
 +        final int keyCnt = 2000;
 +        final int valCnt = 10000;
 +
 +        final Ignite g = grid(0);
 +
 +        // Put test values into cache.
 +        final IgniteCache<Integer, Long> c = g.jcache(null);
 +        final IgniteCache<Integer, String> c1 = g.jcache(null);
 +
 +        assertEquals(0, g.cache(null).size());
 +        assertEquals(0, c1.query(new QuerySqlPredicate(String.class, "1 = 1")).getAll().size());
 +        assertEquals(0, c.query(new QuerySqlPredicate(Long.class, "1 = 1")).getAll().size());
 +
 +        Random rnd = new Random();
 +
 +        for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) {
 +            c.put(i, (long) rnd.nextInt(valCnt));
 +
 +            if (evictsEnabled() && rnd.nextBoolean())
 +                c.localEvict(Arrays.asList(i));
 +        }
 +
 +        final AtomicBoolean done = new AtomicBoolean();
 +
 +        IgniteInternalFuture<?> fut = multithreadedAsync(new CAX() {
 +            @Override public void applyx() throws IgniteCheckedException {
 +                Random rnd = new Random();
 +
 +                while (!done.get()) {
 +                    int key = rnd.nextInt(keyCnt);
 +
 +                    switch (rnd.nextInt(5)) {
 +                        case 0:
 +                            c.put(key, (long) rnd.nextInt(valCnt));
 +
 +                            break;
 +                        case 1:
 +                            if (evictsEnabled())
 +                                c.localEvict(Arrays.asList(key));
 +
 +                            break;
 +                        case 2:
 +                            c.remove(key);
 +
 +                            break;
 +                        case 3:
 +                            c.get(key);
 +
 +                            break;
 +                        case 4:
 +                            int from = rnd.nextInt(valCnt);
 +
 +                            Collection<Cache.Entry<Integer, Long>> res = c.query(new QuerySqlPredicate(Long.class,
 +                                "_val between ? and ?", from, from + 250)).getAll();
 +
 +                            for (Cache.Entry<Integer, Long> ignored : res) {
 +                                //No-op.
 +                            }
 +                    }
 +                }
 +            }
 +        }, threadCnt);
 +
 +        Thread.sleep(DURATION);
 +
 +        done.set(true);
 +
 +        fut.get();
 +    }
 +
 +    /**
 +     * TODO
 +     *
 +     * JUnit.
 +     *
 +     * @throws Exception If failed.
 +     */
 +    @SuppressWarnings({"TooBroadScope"})
 +    public void _testMultiThreadedSwapUnswapLongString() throws Exception {
 +        int threadCnt = 150;
 +        final int keyCnt = 2000;
 +        final int valCnt = 10000;
 +
 +        final Ignite g = grid(0);
 +
 +        // Put test values into cache.
 +        final IgniteCache<Integer, Object> c = g.jcache(null);
 +
 +        assertEquals(0, g.jcache(null).size());
 +        assertEquals(0, c.query(new QuerySqlPredicate(Object.class, "1 = 1")).getAll().size());
 +
 +        Random rnd = new Random();
 +
 +        for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) {
 +            c.put(i, rnd.nextBoolean() ? (long) rnd.nextInt(valCnt) : String.valueOf(rnd.nextInt(valCnt)));
 +
 +            if (evictsEnabled() && rnd.nextBoolean())
 +                c.localEvict(Arrays.asList(i));
 +        }
 +
 +        final AtomicBoolean done = new AtomicBoolean();
 +
 +        IgniteInternalFuture<?> fut = multithreadedAsync(new CAX() {
 +            @Override public void applyx() throws IgniteCheckedException {
 +                Random rnd = new Random();
 +
 +                while (!done.get()) {
 +                    int key = rnd.nextInt(keyCnt);
 +
 +                    switch (rnd.nextInt(5)) {
 +                        case 0:
 +                            c.put(key, rnd.nextBoolean() ? (long) rnd.nextInt(valCnt) :
 +                                    String.valueOf(rnd.nextInt(valCnt)));
 +
 +                            break;
 +                        case 1:
 +                            if (evictsEnabled())
 +                                c.localEvict(Arrays.asList(key));
 +
 +                            break;
 +                        case 2:
 +                            c.remove(key);
 +
 +                            break;
 +                        case 3:
 +                            c.get(key);
 +
 +                            break;
 +                        case 4:
 +                            int from = rnd.nextInt(valCnt);
 +
 +                            Collection<Cache.Entry<Integer, Object>> res = c.query(
 +                                new QuerySqlPredicate(Object.class, "_val between ? and ?", from, from + 250))
 +                                .getAll();
 +
 +                            for (Cache.Entry<Integer, Object> ignored : res) {
 +                                //No-op.
 +                            }
 +                    }
 +                }
 +            }
 +        }, threadCnt);
 +
 +        Thread.sleep(DURATION);
 +
 +        done.set(true);
 +
 +        fut.get();
 +    }
 +
 +    /**
 +     * TODO
 +     *
 +     * @throws Exception If failed.
 +     */
 +    @SuppressWarnings({"TooBroadScope"})
 +    public void _testMultiThreadedSwapUnswapObject() throws Exception {
 +        int threadCnt = 50;
 +        final int keyCnt = 4000;
 +        final int valCnt = 10000;
 +
 +        final Ignite g = grid(0);
 +
 +        // Put test values into cache.
 +        final IgniteCache<Integer, TestValue> c = g.jcache(null);
 +
 +        assertEquals(0, g.cache(null).size());
 +        assertEquals(0, c.query(new QuerySqlPredicate(TestValue.class, "1 = 1")).getAll().size());
 +
 +        Random rnd = new Random();
 +
 +        for (int i = 0; i < keyCnt; i += 1 + rnd.nextInt(3)) {
 +            c.put(i, new TestValue(rnd.nextInt(valCnt)));
 +
 +            if (evictsEnabled() && rnd.nextBoolean())
 +                c.localEvict(Arrays.asList(i));
 +        }
 +
 +        final AtomicBoolean done = new AtomicBoolean();
 +
 +        IgniteInternalFuture<?> fut = multithreadedAsync(new CAX() {
 +            @Override public void applyx() throws IgniteCheckedException {
 +                Random rnd = new Random();
 +
 +                while (!done.get()) {
 +                    int key = rnd.nextInt(keyCnt);
 +
 +                    switch (rnd.nextInt(5)) {
 +                        case 0:
 +                            c.put(key, new TestValue(rnd.nextInt(valCnt)));
 +
 +                            break;
 +                        case 1:
 +                            if (evictsEnabled())
 +                                c.localEvict(Arrays.asList(key));
 +
 +                            break;
 +                        case 2:
 +                            c.remove(key);
 +
 +                            break;
 +                        case 3:
 +                            c.get(key);
 +
 +                            break;
 +                        case 4:
 +                            int from = rnd.nextInt(valCnt);
 +
 +                            Collection<Cache.Entry<Integer, TestValue>> res =
 +                                c.query(new QuerySqlPredicate(TestValue.class, "TestValue.val between ? and ?",
 +                                    from, from + 250)).getAll();
 +
 +                            for (Cache.Entry<Integer, TestValue> ignored : res) {
 +                                //No-op.
 +                            }
 +                    }
 +                }
 +            }
 +        }, threadCnt);
 +
 +        Thread.sleep(DURATION);
 +
 +        done.set(true);
 +
 +        fut.get();
 +    }
 +
 +    /**
 +     * JUnit.
 +     *
 +     * @throws Exception If failed.
 +     */
 +    @SuppressWarnings({"TooBroadScope"})
 +    public void testMultiThreadedSameQuery() throws Exception {
 +        int threadCnt = 50;
 +        final int keyCnt = 10;
 +        final int logMod = 5000;
 +
 +        final Ignite g = grid(0);
 +
 +        // Put test values into cache.
 +        final IgniteCache<Integer, Integer> c = g.jcache(null);
 +
 +        for (int i = 0; i < keyCnt; i++) {
 +            c.put(i, i);
 +
 +            c.localEvict(Arrays.asList(i));
 +        }
 +
 +        final AtomicInteger cnt = new AtomicInteger();
 +
 +        final AtomicBoolean done = new AtomicBoolean();
 +
 +        IgniteInternalFuture<?> fut = multithreadedAsync(
 +            new CAX() {
 +                @Override public void applyx() throws IgniteCheckedException {
 +                    int iter = 0;
 +
 +                    while (!done.get() && !Thread.currentThread().isInterrupted()) {
 +                        iter++;
 +
 +                        Collection<Cache.Entry<Integer, Integer>> entries =
 +                            c.query(new QuerySqlPredicate(Integer.class, "_val >= 0")).getAll();
 +
 +                        assert entries != null;
 +
 +                        assertEquals("Query results [entries=" + entries + ", aff=" + affinityNodes(entries, g) +
 +                            ", iteration=" + iter + ']', keyCnt, entries.size());
 +
 +                        if (cnt.incrementAndGet() % logMod == 0) {
 +                            GridCacheQueryManager<Object, Object> qryMgr =
 +                                ((IgniteKernal)g).internalCache().context().queries();
 +
 +                            assert qryMgr != null;
 +
 +                            qryMgr.printMemoryStats();
 +                        }
 +                    }
 +                }
 +            }, threadCnt);
 +
 +        Thread.sleep(DURATION);
 +
 +        info("Finishing test...");
 +
 +        done.set(true);
 +
 +        fut.get();
 +    }
 +
 +    /**
 +     * JUnit.
 +     *
 +     * @throws Exception If failed.
 +     */
 +    @SuppressWarnings({"TooBroadScope"})
 +    public void testMultiThreadedNewQueries() throws Exception {
 +        int threadCnt = 50;
 +        final int keyCnt = 10;
 +        final int logMod = 5000;
 +
 +        final Ignite g = grid(0);
 +
 +        // Put test values into cache.
 +        final IgniteCache<Integer, Integer> c = g.jcache(null);
 +
 +        for (int i = 0; i < keyCnt; i++) {
 +            c.put(i, i);
 +
 +            c.localEvict(Arrays.asList(i));
 +        }
 +
 +        final AtomicInteger cnt = new AtomicInteger();
 +
 +        final AtomicBoolean done = new AtomicBoolean();
 +
 +        IgniteInternalFuture<?> fut = multithreadedAsync(new CAX() {
 +            @Override public void applyx() throws IgniteCheckedException {
 +                int iter = 0;
 +
 +                while (!done.get() && !Thread.currentThread().isInterrupted()) {
 +                    iter++;
 +
 +                    Collection<Cache.Entry<Integer, Integer>> entries =
 +                        c.query(new QuerySqlPredicate(Integer.class, "_val >= 0")).getAll();
 +
 +                    assert entries != null;
 +
 +                    assertEquals("Entries count is not as expected on iteration: " + iter, keyCnt, entries.size());
 +
 +                    if (cnt.incrementAndGet() % logMod == 0) {
 +                        GridCacheQueryManager<Object, Object> qryMgr =
 +                            ((IgniteKernal)g).internalCache().context().queries();
 +
 +                        assert qryMgr != null;
 +
 +                        qryMgr.printMemoryStats();
 +                    }
 +                }
 +            }
 +        }, threadCnt);
 +
 +        Thread.sleep(DURATION);
 +
 +        done.set(true);
 +
 +        fut.get();
 +    }
 +
 +    /**
 +     * JUnit.
 +     *
 +     * @throws Exception If failed.
 +     */
 +    @SuppressWarnings({"TooBroadScope"})
 +    public void testMultiThreadedScanQuery() throws Exception {
 +        int threadCnt = 50;
 +        final int keyCnt = 500;
 +        final int logMod = 5000;
 +
 +        final Ignite g = grid(0);
 +
 +        // Put test values into cache.
 +        final IgniteCache<Integer, Integer> c = g.jcache(null);
 +
 +        for (int i = 0; i < keyCnt; i++)
 +            c.put(i, i);
 +
 +        final AtomicInteger cnt = new AtomicInteger();
 +
 +        final AtomicBoolean done = new AtomicBoolean();
 +
 +        IgniteInternalFuture<?> fut = multithreadedAsync(
 +            new CAX() {
 +                @Override public void applyx() throws IgniteCheckedException {
 +                    int iter = 0;
 +
 +                    while (!done.get() && !Thread.currentThread().isInterrupted()) {
 +                        iter++;
 +
 +                        // Scan query.
 +                        Collection<Cache.Entry<Integer, Integer>> entries =
 +                            c.query(new QueryScanPredicate<Integer, Integer>()).getAll();
 +
 +                        assert entries != null;
 +
 +                        assertEquals("Entries count is not as expected on iteration: " + iter, keyCnt, entries.size());
 +
 +                        if (cnt.incrementAndGet() % logMod == 0) {
 +                            GridCacheQueryManager<Object, Object> qryMgr =
 +                                ((IgniteKernal)g).internalCache().context().queries();
 +
 +                            assert qryMgr != null;
 +
 +                            qryMgr.printMemoryStats();
 +                        }
 +                    }
 +                }
 +            }, threadCnt);
 +
 +        Thread.sleep(DURATION);
 +
 +        done.set(true);
 +
 +        fut.get();
 +    }
 +
 +    /**
 +     * Test value.
 +     */
 +    private static class TestValue implements Serializable {
 +        /** Value. */
 +        @QuerySqlField
 +        private int val;
 +
 +        /**
 +         * @param val Value.
 +         */
 +        private TestValue(int val) {
 +            this.val = val;
 +        }
 +
 +        /**
 +         * @return Value.
 +         */
 +        public int value() {
 +            return val;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/97a9adc8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java
index eede5a7,0000000..773d49e
mode 100644,000000..100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java
@@@ -1,579 -1,0 +1,579 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.cache.distributed.replicated;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.cache.*;
 +import org.apache.ignite.cache.query.*;
 +import org.apache.ignite.cache.query.annotations.*;
 +import org.apache.ignite.events.*;
 +import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.processors.cache.*;
 +import org.apache.ignite.internal.processors.cache.query.*;
 +import org.apache.ignite.internal.util.future.*;
 +import org.apache.ignite.internal.util.lang.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +import org.apache.ignite.lang.*;
 +import org.apache.ignite.testframework.*;
 +import org.apache.ignite.transactions.*;
 +import org.springframework.util.*;
 +
 +import javax.cache.*;
 +import java.io.*;
 +import java.lang.reflect.*;
 +import java.sql.*;
 +import java.util.*;
 +import java.util.concurrent.*;
 +
 +import static org.apache.ignite.cache.CacheMode.*;
 +import static org.apache.ignite.events.IgniteEventType.*;
 +
 +/**
 + * Tests replicated query.
 + */
 +public class IgniteCacheReplicatedQuerySelfTest extends IgniteCacheAbstractQuerySelfTest {
 +    /** */
 +    private static final boolean TEST_DEBUG = false;
 +
 +    /** Grid1. */
 +    private static Ignite ignite1;
 +
 +    /** Grid2. */
 +    private static Ignite ignite2;
 +
 +    /** Grid3. */
 +    private static Ignite ignite3;
 +
 +    /** Cache1. */
 +    private static IgniteCache<CacheKey, CacheValue> cache1;
 +
 +    /** Cache2. */
 +    private static IgniteCache<CacheKey, CacheValue> cache2;
 +
 +    /** Cache3. */
 +    private static IgniteCache<CacheKey, CacheValue> cache3;
 +
 +    /** Key serialization cnt. */
 +    private static volatile int keySerCnt;
 +
 +    /** Key deserialization count. */
 +    private static volatile int keyDesCnt;
 +
 +    /** Value serialization count. */
 +    private static volatile int valSerCnt;
 +
 +    /** Value deserialization count. */
 +    private static volatile int valDesCnt;
 +
 +    /** {@inheritDoc} */
 +    @Override protected int gridCount() {
 +        return 3;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected CacheMode cacheMode() {
 +        return REPLICATED;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void beforeTest() throws Exception {
 +        super.beforeTest();
 +
 +        ignite1 = grid(0);
 +        ignite2 = grid(1);
 +        ignite3 = grid(2);
 +
 +        cache1 = ignite1.jcache(null);
 +        cache2 = ignite2.jcache(null);
 +        cache3 = ignite3.jcache(null);
 +    }
 +
 +    /**
 +     * @throws Exception If failed.
 +     */
 +    public void testClientOnlyNode() throws Exception {
 +        try {
 +            Ignite g = startGrid("client");
 +
 +            IgniteCache<Integer, Integer> c = g.jcache(null);
 +
 +            for (int i = 0; i < 10; i++)
 +                c.put(i, i);
 +
 +            // Client cache should be empty.
 +            assertEquals(0, c.size());
 +
 +            Collection<Cache.Entry<Integer, Integer>> res =
 +                c.query(new QuerySqlPredicate(Integer.class, "_key >= 5 order by _key")).getAll();
 +
 +            assertEquals(5, res.size());
 +
 +            int i = 5;
 +
 +            for (Cache.Entry<Integer, Integer> e : res) {
 +                assertEquals(i, e.getKey().intValue());
 +                assertEquals(i, e.getValue().intValue());
 +
 +                i++;
 +            }
 +        }
 +        finally {
 +            stopGrid("client");
 +        }
 +    }
 +
 +    /**
 +     * JUnit.
 +     *
 +     * @throws Exception If failed.
 +     */
 +    public void testIterator() throws Exception {
 +        int keyCnt = 100;
 +
 +        for (int i = 0; i < keyCnt; i++)
 +            cache1.put(new CacheKey(i), new CacheValue("val" + i));
 +
 +        assertEquals(keyCnt, cache1.size());
 +        assertEquals(keyCnt, cache2.size());
 +        assertEquals(keyCnt, cache3.size());
 +
 +        QueryCursor<Cache.Entry<CacheKey, CacheValue>> qry =
 +            cache1.query(new QuerySqlPredicate(CacheValue.class, "true"));
 +
 +        Iterator<Cache.Entry<CacheKey, CacheValue>> iter = qry.iterator();
 +
 +        assert iter.hasNext();
 +
 +        int cnt = 0;
 +
 +        while (iter.hasNext()) {
 +            iter.next();
 +
 +            cnt++;
 +        }
 +
 +        // Expect duplicates since we run query on full projection of 3 nodes and dedup flag is false.
 +        assertEquals(keyCnt * 3, cnt);
 +    }
 +
 +    /**
 +     * @throws Exception If test failed.
 +     */
 +    public void testLocalQuery() throws Exception {
-         cache1.removeAll();
++        cache1.clear();
 +
 +        IgniteTx tx = ignite1.transactions().txStart();
 +
 +        try {
 +            cache1.put(new CacheKey(1), new CacheValue("1"));
 +            cache1.put(new CacheKey(2), new CacheValue("2"));
 +            cache1.put(new CacheKey(3), new CacheValue("3"));
 +            cache1.put(new CacheKey(4), new CacheValue("4"));
 +
 +            tx.commit();
 +
 +            info("Committed transaction: " + tx);
 +        }
 +        catch (IgniteException e) {
 +            tx.rollback();
 +
 +            throw e;
 +        }
 +
 +        checkQueryResults(cache1);
 +        checkQueryResults(cache2);
 +        checkQueryResults(cache3);
 +    }
 +
 +    /**
 +     * @throws Exception If test failed.
 +     */
 +    public void testDistributedQuery() throws Exception {
 +        int keyCnt = 4;
 +
 +        final CountDownLatch latch = new CountDownLatch(keyCnt * 2);
 +
 +        IgnitePredicate<IgniteEvent> lsnr = new IgnitePredicate<IgniteEvent>() {
 +            @Override public boolean apply(IgniteEvent evt) {
 +                latch.countDown();
 +
 +                return true;
 +            }
 +        };
 +
 +        ignite2.events().localListen(lsnr, IgniteEventType.EVT_CACHE_OBJECT_PUT);
 +        ignite3.events().localListen(lsnr, IgniteEventType.EVT_CACHE_OBJECT_PUT);
 +
 +        IgniteTx tx = ignite1.transactions().txStart();
 +
 +        try {
 +            for (int i = 1; i <= keyCnt; i++)
 +                cache1.put(new CacheKey(i), new CacheValue(String.valueOf(i)));
 +
 +            tx.commit();
 +
 +            info("Committed transaction: " + tx);
 +        }
 +        catch (IgniteException e) {
 +            tx.rollback();
 +
 +            throw e;
 +        }
 +
 +        latch.await();
 +
 +        QueryCursor<Cache.Entry<CacheKey, CacheValue>> qry =
 +            cache1.query(new QuerySqlPredicate(CacheValue.class, "val > 1 and val < 4"));
 +
 +        // Distributed query.
 +        assertEquals(6, qry.getAll().size());
 +
 +        // Create new query, old query cannot be modified after it has been executed.
 +        qry = cache3.localQuery(new QuerySqlPredicate(CacheValue.class, "val > 1 and val < 4"));
 +
 +        // Tests execute on node.
 +        Iterator<Cache.Entry<CacheKey, CacheValue>> iter = qry.iterator();
 +
 +        assert iter != null;
 +        assert iter.hasNext();
 +
 +        iter.next();
 +
 +        assert iter.hasNext();
 +
 +        iter.next();
 +
 +        assert !iter.hasNext();
 +    }
 +
 +    /**
 +     * Returns private field {@code qryIters} of {@link GridCacheQueryManager} for the given grid.
 +     *
 +     * @param g Grid which {@link GridCacheQueryManager} should be observed.
 +     * @return {@code qryIters} of {@link GridCacheQueryManager}.
 +     */
 +    private ConcurrentMap<UUID,
 +        Map<Long, GridFutureAdapter<GridCloseableIterator<IgniteBiTuple<CacheKey, CacheValue>>>>>
 +        distributedQueryManagerQueryItersMap(Ignite g) {
 +        GridCacheContext ctx = ((IgniteKernal)g).internalCache().context();
 +
 +        Field qryItersField = ReflectionUtils.findField(ctx.queries().getClass(), "qryIters");
 +
 +        qryItersField.setAccessible(true);
 +
 +        return (ConcurrentMap<UUID,
 +            Map<Long, GridFutureAdapter<GridCloseableIterator<IgniteBiTuple<CacheKey, CacheValue>>>>>)
 +            ReflectionUtils.getField(qryItersField, ctx.queries());
 +    }
 +
 +    /**
 +     * @throws Exception If test failed.
 +     */
 +    public void testToString() throws Exception {
 +        int keyCnt = 4;
 +
 +        for (int i = 1; i <= keyCnt; i++)
 +            cache1.put(new CacheKey(i), new CacheValue(String.valueOf(i)));
 +
 +        // Create query with key filter.
 +
 +        QueryCursor<Cache.Entry<CacheKey, CacheValue>> qry =
 +            cache1.query(new QuerySqlPredicate(CacheValue.class, "val > 0"));
 +
 +        assertEquals(keyCnt * 3, qry.getAll().size());
 +
 +        info("Query result: " + qry.getAll());
 +    }
 +
 +    /**
 +     * TODO
 +     *
 +     * @throws Exception If failed.
 +     */
 +    public void _testLostIterator() throws Exception {
 +        IgniteCache<Integer, Integer> cache = ignite.jcache(null);
 +
 +        for (int i = 0; i < 1000; i++)
 +            cache.put(i, i);
 +
 +        QueryCursor<Cache.Entry<Integer, Integer>> fut = null;
 +
 +        for (int i = 0; i < cache.getConfiguration(CacheConfiguration.class).getMaximumQueryIteratorCount() + 1; i++) {
 +            QueryCursor<Cache.Entry<Integer, Integer>> q =
 +                cache.query(new QuerySqlPredicate(Integer.class, "_key >= 0 order by _key"));
 +
 +            assertEquals(0, (int)q.iterator().next().getKey());
 +
 +            if (fut == null)
 +                fut = q;
 +        }
 +
 +        final QueryCursor<Cache.Entry<Integer, Integer>> fut0 = fut;
 +
 +        GridTestUtils.assertThrows(log, new Callable<Object>() {
 +            @Override public Object call() throws Exception {
 +                int i = 0;
 +
 +                Cache.Entry<Integer, Integer> e;
 +
 +                while ((e = fut0.iterator().next()) != null)
 +                    assertEquals(++i, (int)e.getKey());
 +
 +                return null;
 +            }
 +        }, IgniteException.class, null);
 +    }
 +
 +    /**
 +     * TODO enable
 +     *
 +     * @throws Exception If failed.
 +     */
 +    public void _testNodeLeft() throws Exception {
 +        try {
 +            Ignite g = startGrid();
 +
 +            IgniteCache<Integer, Integer> cache = g.jcache(null);
 +
 +            for (int i = 0; i < 1000; i++)
 +                cache.put(i, i);
 +
 +            QueryCursor<Cache.Entry<Integer, Integer>> q =
 +                cache.query(new QuerySqlPredicate(Integer.class, "_key >= 0 order by _key"));
 +
 +            assertEquals(0, (int) q.iterator().next().getKey());
 +
 +            final ConcurrentMap<UUID, Map<Long, GridFutureAdapter<GridCloseableIterator<
 +                IgniteBiTuple<Integer, Integer>>>>> map =
 +                U.field(((IgniteKernal)grid(0)).internalCache().context().queries(), "qryIters");
 +
 +            // fut.nextX() does not guarantee the request has completed on remote node
 +            // (we could receive page from local one), so we need to wait.
 +            assertTrue(GridTestUtils.waitForCondition(new PA() {
 +                @Override public boolean apply() {
 +                    return map.size() == 1;
 +                }
 +            }, getTestTimeout()));
 +
 +            Map<Long, GridFutureAdapter<GridCloseableIterator<IgniteBiTuple<Integer, Integer>>>> futs =
 +                map.get(g.cluster().localNode().id());
 +
 +            assertEquals(1, futs.size());
 +
 +            GridCloseableIterator<IgniteBiTuple<Integer, Integer>> iter =
 +                (GridCloseableIterator<IgniteBiTuple<Integer, Integer>>)((IgniteInternalFuture)F.first(futs.values()).get()).get();
 +
 +            ResultSet rs = U.field(iter, "data");
 +
 +            assertFalse(rs.isClosed());
 +
 +            final UUID nodeId = g.cluster().localNode().id();
 +            final CountDownLatch latch = new CountDownLatch(1);
 +
 +            grid(0).events().localListen(new IgnitePredicate<IgniteEvent>() {
 +                @Override public boolean apply(IgniteEvent evt) {
 +                    if (((IgniteDiscoveryEvent)evt).eventNode().id().equals(nodeId))
 +                        latch.countDown();
 +
 +                    return true;
 +                }
 +            }, EVT_NODE_LEFT);
 +
 +            stopGrid();
 +
 +            latch.await();
 +
 +            assertEquals(0, map.size());
 +            assertTrue(rs.isClosed());
 +        }
 +        finally {
 +            // Ensure that additional node is stopped.
 +            stopGrid();
 +        }
 +    }
 +
 +    /**
 +     * @param cache Cache.
 +     * @throws Exception If check failed.
 +     */
 +    private void checkQueryResults(IgniteCache<CacheKey, CacheValue> cache) throws Exception {
 +        QueryCursor<Cache.Entry<CacheKey, CacheValue>> qry =
 +            cache.localQuery(new QuerySqlPredicate(CacheValue.class, "val > 1 and val < 4"));
 +
 +        Iterator<Cache.Entry<CacheKey, CacheValue>> iter = qry.iterator();
 +
 +        assert iter != null;
 +        assert iter.hasNext();
 +
 +        Cache.Entry<CacheKey, CacheValue> entry = iter.next();
 +
 +        assert entry.getKey().equals(new CacheKey(2)) || entry.getKey().equals(new CacheKey(3));
 +
 +        assert iter.hasNext();
 +
 +        entry = iter.next();
 +
 +        assert entry.getKey().equals(new CacheKey(2)) || entry.getKey().equals(new CacheKey(3));
 +        assert !iter.hasNext();
 +    }
 +
 +    /**
 +     * Cache key.
 +     */
 +    private static class CacheKey implements Externalizable {
 +        /** Key. */
 +        private int key;
 +
 +        /**
 +         * @param key Key.
 +         */
 +        CacheKey(int key) {
 +            this.key = key;
 +        }
 +
 +        /**
 +         *
 +         */
 +        public CacheKey() {
 +            /* No-op. */
 +        }
 +
 +        /**
 +         * @return Key.
 +         */
 +        public int getKey() {
 +            return key;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
 +            key = in.readInt();
 +
 +            keyDesCnt++;
 +
 +            if (TEST_DEBUG)
 +                X.println("Deserialized demo key [keyDesCnt=" + keyDesCnt + ", key=" + this + ']');
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void writeExternal(ObjectOutput out) throws IOException {
 +            out.writeInt(key);
 +
 +            keySerCnt++;
 +
 +            if (TEST_DEBUG)
 +                X.println("Serialized demo key [serCnt=" + keySerCnt + ", key=" + this + ']');
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public boolean equals(Object o) {
 +            CacheKey cacheKey;
 +
 +            if (o instanceof CacheKey)
 +                cacheKey = (CacheKey)o;
 +            else
 +                return false;
 +
 +            return key == cacheKey.key;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public int hashCode() {
 +            return key;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public String toString() {
 +            return S.toString(CacheKey.class, this);
 +        }
 +    }
 +
 +    /**
 +     * Cache value..
 +     */
 +    private static class CacheValue implements Externalizable {
 +        /** Value. */
 +        @QuerySqlField
 +        private String val;
 +
 +        /**
 +         * @param val Value.
 +         */
 +        CacheValue(String val) {
 +            this.val = val;
 +        }
 +
 +        /**
 +         *
 +         */
 +        public CacheValue() {
 +            /* No-op. */
 +        }
 +
 +        /**
 +         * @return Value.
 +         */
 +        public String getValue() {
 +            return val;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
 +            val = U.readString(in);
 +
 +            valDesCnt++;
 +
 +            if (TEST_DEBUG)
 +                X.println("Deserialized demo value [valDesCnt=" + valDesCnt + ", val=" + this + ']');
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void writeExternal(ObjectOutput out) throws IOException {
 +            U.writeString(out, val);
 +
 +            valSerCnt++;
 +
 +            if (TEST_DEBUG)
 +                X.println("Serialized demo value [serCnt=" + valSerCnt + ", val=" + this + ']');
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public boolean equals(Object o) {
 +            if (this == o)
 +                return true;
 +
 +            if (o == null || getClass() != o.getClass())
 +                return false;
 +
 +            CacheValue val = (CacheValue)o;
 +
 +            return !(this.val != null ? !this.val.equals(val.val) : val.val != null);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public int hashCode() {
 +            return val != null ? val.hashCode() : 0;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public String toString() {
 +            return S.toString(CacheValue.class, this);
 +        }
 +    }
 +}


Mime
View raw message