ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sevdoki...@apache.org
Subject [05/50] [abbrv] incubator-ignite git commit: Merge branch ignite-sql-tests into ignite-45
Date Mon, 16 Mar 2015 09:22:52 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheSqlQueryMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheSqlQueryMultiThreadedSelfTest.java
index 0000000,88183c7..74d54af
mode 000000,100644..100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheSqlQueryMultiThreadedSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheSqlQueryMultiThreadedSelfTest.java
@@@ -1,0 -1,197 +1,197 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.query.*;
+ import org.apache.ignite.cache.query.annotations.*;
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.internal.*;
++import org.apache.ignite.internal.processors.cache.query.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.spi.discovery.tcp.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+ import org.apache.ignite.testframework.*;
+ import org.apache.ignite.testframework.junits.common.*;
+ 
+ import javax.cache.*;
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static org.apache.ignite.cache.CacheAtomicityMode.*;
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
+ import static org.apache.ignite.cache.CacheMode.*;
+ 
+ /**
+  *
+  */
+ public class IgniteCacheSqlQueryMultiThreadedSelfTest extends GridCommonAbstractTest {
+     /** */
+     private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+ 
+     /** {@inheritDoc} */
+     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+         IgniteConfiguration c = super.getConfiguration(gridName);
+ 
+         TcpDiscoverySpi disco = new TcpDiscoverySpi();
+ 
+         disco.setIpFinder(ipFinder);
+ 
+         c.setDiscoverySpi(disco);
+ 
+         CacheConfiguration<?,?> ccfg = new CacheConfiguration();
+ 
+         ccfg.setCacheMode(PARTITIONED);
 -        ccfg.setDistributionMode(PARTITIONED_ONLY);
++        ccfg.setNearConfiguration(null);
+         ccfg.setBackups(1);
+         ccfg.setAtomicityMode(TRANSACTIONAL);
+         ccfg.setIndexedTypes(
+             Integer.class, Person.class
+         );
+ 
+         c.setCacheConfiguration(ccfg);
+ 
+         return c;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void beforeTestsStarted() throws Exception {
+         super.beforeTestsStarted();
+ 
+         startGrids(2);
+ 
+         awaitPartitionMapExchange();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void afterTestsStopped() throws Exception {
+         super.afterTestsStopped();
+ 
+         stopAllGrids();
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testQuery() throws Exception {
+         final IgniteCache<Integer, Person> cache = grid(0).jcache(null);
+ 
+         cache.clear();
+ 
+         for (int i = 0; i < 2000; i++)
+             cache.put(i, new Person(i));
+ 
+         GridTestUtils.runMultiThreaded(new Callable<Void>() {
+             @Override public Void call() throws Exception {
+                 for (int i = 0; i < 100; i++) {
+                     QueryCursor<Cache.Entry<Integer, Person>> qry =
+                         cache.query(new SqlQuery("Person", "age >= 0"));
+ 
+                     int cnt = 0;
+ 
+                     for (Cache.Entry<Integer, Person> e : qry)
+                         cnt++;
+ 
+                     assertEquals(2000, cnt);
+                 }
+ 
+                 return null;
+             }
+         }, 16, "test");
+     }
+ 
+     /**
+      * Test put and parallel query.
+      * @throws Exception If failed.
+      */
+     public void testQueryPut() throws Exception {
+         final IgniteCache<Integer, Person> cache = grid(0).jcache(null);
+ 
+         cache.clear();
+ 
+         final AtomicBoolean stop = new AtomicBoolean();
+ 
+         IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Void>()
{
+             @Override public Void call() throws Exception {
+                 Random rnd = new GridRandom();
+ 
+                 while (!stop.get()) {
+                     List<List<?>> res = cache.queryFields(
+                         new SqlFieldsQuery("select avg(age) from Person where age > 0")).getAll();
+ 
+                     assertEquals(1, res.size());
+ 
+                     if (res.get(0).get(0) == null)
+                         continue;
+ 
+                     int avgAge = ((Number)res.get(0).get(0)).intValue();
+ 
+                     if (rnd.nextInt(300) == 0)
+                         X.println("__ " + avgAge);
+                 }
+ 
+                 return null;
+             }
+         }, 20);
+ 
+         IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Void>()
{
+             @Override public Void call() throws Exception {
+                 Random rnd = new GridRandom();
+                 Random age = new GridRandom();
+ 
+                 while (!stop.get())
+                     cache.put(rnd.nextInt(2000), new Person(age.nextInt(3000) - 1000));
+ 
+                 return null;
+             }
+         }, 20);
+ 
+         Thread.sleep(30 * 1000);
+ 
+         stop.set(true);
+ 
+         fut2.get(10 * 1000);
+         fut1.get(10 * 1000);
+     }
+ 
+     /**
+      *
+      */
+     private static class Person implements Serializable {
+         /** */
+         @QuerySqlField(index = true)
+         private int age;
+ 
+         /**
+          * @param age Age.
+          */
+         Person(int age) {
+             this.age = age;
+         }
+ 
+         /**
+          * @return Age/
+          */
+         public int age() {
+             return age;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicFieldsQuerySelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicFieldsQuerySelfTest.java
index 0000000,406814e..2311139
mode 000000,100644..100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicFieldsQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicFieldsQuerySelfTest.java
@@@ -1,0 -1,59 +1,59 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.distributed.near;
+ 
+ import org.apache.ignite.cache.*;
++import org.apache.ignite.configuration.*;
+ import org.apache.ignite.cache.query.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ 
+ import java.util.*;
+ 
+ import static org.apache.ignite.cache.CacheAtomicityMode.*;
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
+ 
+ /**
+  * Tests for fields queries.
+  */
+ public class IgniteCacheAtomicFieldsQuerySelfTest extends IgniteCachePartitionedFieldsQuerySelfTest
{
+     /** {@inheritDoc} */
+     @Override protected CacheAtomicityMode atomicityMode() {
+         return ATOMIC;
+     }
+ 
+     /** {@inheritDoc} */
 -    @Override protected CacheDistributionMode distributionMode() {
 -        return PARTITIONED_ONLY;
++    @Override protected NearCacheConfiguration nearConfiguration() {
++        return null;
+     }
+ 
+     /**
+      *
+      */
+     public void testUnsupportedOperations() {
+         try {
+             QueryCursor<List<?>> qry = grid(0).jcache(null).queryFields(new
SqlFieldsQuery(
+                 "update Person set name = ?").setArgs("Mary Poppins"));
+ 
+             qry.getAll();
+ 
+             fail("We don't support updates.");
+         }
+         catch (Exception e) {
+             X.println("___ " + e.getMessage());
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicNearEnabledFieldsQuerySelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicNearEnabledFieldsQuerySelfTest.java
index 0000000,b75c0a7..d5e02c2
mode 000000,100644..100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicNearEnabledFieldsQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheAtomicNearEnabledFieldsQuerySelfTest.java
@@@ -1,0 -1,32 +1,30 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.distributed.near;
+ 
 -import org.apache.ignite.cache.*;
 -
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
++import org.apache.ignite.configuration.*;
+ 
+ /**
+  * Tests for atomic cache with near cache enabled.
+  */
+ public class IgniteCacheAtomicNearEnabledFieldsQuerySelfTest extends IgniteCacheAtomicFieldsQuerySelfTest
{
+     /** {@inheritDoc} */
 -    @Override protected CacheDistributionMode distributionMode() {
 -        return NEAR_PARTITIONED;
++    @Override protected NearCacheConfiguration nearConfiguration() {
++        return new NearCacheConfiguration();
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
index 0000000,a804d39..1518e09
mode 000000,100644..100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
@@@ -1,0 -1,217 +1,216 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.distributed.near;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.cache.query.*;
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.spi.discovery.tcp.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+ 
+ import javax.cache.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static org.apache.ignite.cache.CacheAtomicityMode.*;
+ import static org.apache.ignite.cache.CacheDistributionMode.*;
+ import static org.apache.ignite.cache.CacheMode.*;
+ 
+ /**
+  * Test for distributed queries with node restarts.
+  */
+ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTest {
+     /** */
+     private static final int GRID_CNT = 3;
+ 
+     /** */
+     private static final int KEY_CNT = 1000;
+ 
+     /** */
+     private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+ 
+     /** {@inheritDoc} */
+     @Override protected int gridCount() {
+         return GRID_CNT;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected long getTestTimeout() {
+         return 90 * 1000;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+         IgniteConfiguration c = super.getConfiguration(gridName);
+ 
+         TcpDiscoverySpi disco = new TcpDiscoverySpi();
+ 
+         disco.setIpFinder(ipFinder);
+ 
+         c.setDiscoverySpi(disco);
+ 
+         CacheConfiguration cc = defaultCacheConfiguration();
+ 
+         cc.setCacheMode(PARTITIONED);
+         cc.setBackups(1);
+         cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+         cc.setAtomicityMode(TRANSACTIONAL);
 -        cc.setDistributionMode(NEAR_PARTITIONED);
+ 
+         c.setCacheConfiguration(cc);
+ 
+         return c;
+     }
+ 
+     /**
+      * JUnit.
+      *
+      * @throws Exception If failed.
+      */
+     @SuppressWarnings({"TooBroadScope"})
+     public void testRestarts() throws Exception {
+         int duration = 60 * 1000;
+         int qryThreadNum = 10;
+         final long nodeLifeTime = 2 * 1000;
+         final int logFreq = 20;
+ 
+         final IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
+ 
+         assert cache != null;
+ 
+         for (int i = 0; i < KEY_CNT; i++)
+             cache.put(i, i);
+ 
+         assertEquals(KEY_CNT, cache.localSize());
+ 
+         final AtomicInteger qryCnt = new AtomicInteger();
+ 
+         final AtomicBoolean done = new AtomicBoolean();
+ 
+         IgniteInternalFuture<?> fut1 = multithreadedAsync(new CAX() {
+             @Override public void applyx() throws IgniteCheckedException {
+                 while (!done.get()) {
+                     Collection<Cache.Entry<Integer, Integer>> res =
+                         cache.query(new SqlQuery(Integer.class, "_val >= 0")).getAll();
+ 
+                     assertFalse(res.isEmpty());
+ 
+                     int c = qryCnt.incrementAndGet();
+ 
+                     if (c % logFreq == 0)
+                         info("Executed queries: " + c);
+                 }
+             }
+         }, qryThreadNum);
+ 
+         final AtomicInteger restartCnt = new AtomicInteger();
+ 
+         CollectingEventListener lsnr = new CollectingEventListener();
+ 
+         for (int i = 0; i < GRID_CNT; i++)
+             grid(i).events().localListen(lsnr, EventType.EVT_CACHE_REBALANCE_STOPPED);
+ 
+         IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>()
{
+             @SuppressWarnings({"BusyWait"})
+             @Override public Object call() throws Exception {
+                 while (!done.get()) {
+                     int idx = GRID_CNT;
+ 
+                     startGrid(idx);
+ 
+                     Thread.sleep(nodeLifeTime);
+ 
+                     stopGrid(idx);
+ 
+                     int c = restartCnt.incrementAndGet();
+ 
+                     if (c % logFreq == 0)
+                         info("Node restarts: " + c);
+                 }
+ 
+                 return true;
+             }
+         }, 1);
+ 
+         Thread.sleep(duration);
+ 
+         done.set(true);
+ 
+         fut1.get();
+         fut2.get();
+ 
+         info("Awaiting rebalance events [restartCnt=" + restartCnt.get() + ']');
+ 
+         boolean success = lsnr.awaitEvents(GRID_CNT * 2 * restartCnt.get(), 15000);
+ 
+         for (int i = 0; i < GRID_CNT; i++)
+             grid(i).events().stopLocalListen(lsnr, EventType.EVT_CACHE_REBALANCE_STOPPED);
+ 
+         assert success;
+     }
+ 
+     /** Listener that will wait for specified number of events received. */
+     private class CollectingEventListener implements IgnitePredicate<Event> {
+         /** Registered events count. */
+         private int evtCnt;
+ 
+         /** {@inheritDoc} */
+         @Override public synchronized boolean apply(Event evt) {
+             evtCnt++;
+ 
+             info("Processed event [evt=" + evt + ", evtCnt=" + evtCnt + ']');
+ 
+             notifyAll();
+ 
+             return true;
+         }
+ 
+         /**
+          * Waits until total number of events processed is equal or greater then argument
passed.
+          *
+          * @param cnt Number of events to wait.
+          * @param timeout Timeout to wait.
+          * @return {@code True} if successfully waited, {@code false} if timeout happened.
+          * @throws InterruptedException If thread is interrupted.
+          */
+         public synchronized boolean awaitEvents(int cnt, long timeout) throws InterruptedException
{
+             long start = U.currentTimeMillis();
+ 
+             long now = start;
+ 
+             while (start + timeout > now) {
+                 if (evtCnt >= cnt)
+                     return true;
+ 
+                 wait(start + timeout - now);
+ 
+                 now = U.currentTimeMillis();
+             }
+ 
+             return false;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
index 0000000,e7fdb3c..b79b9df
mode 000000,100644..100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
@@@ -1,0 -1,414 +1,413 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.reducefields;
+ 
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.cache.affinity.*;
+ import org.apache.ignite.cache.query.annotations.*;
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.query.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.marshaller.optimized.*;
+ import org.apache.ignite.spi.discovery.*;
+ import org.apache.ignite.spi.discovery.tcp.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+ import org.apache.ignite.testframework.junits.common.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.io.*;
+ import java.util.*;
+ 
+ import static org.apache.ignite.cache.CacheAtomicityMode.*;
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
+ import static org.apache.ignite.cache.CacheMode.*;
+ import static org.apache.ignite.cache.CacheRebalanceMode.*;
+ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+ 
+ /**
+  * Tests for reduce fields queries.
+  */
+ public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCommonAbstractTest
{
+     /** IP finder. */
+     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+ 
+     /** Flag indicating if starting node should have cache. */
+     protected boolean hasCache;
+ 
+     /** {@inheritDoc} */
+     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+         IgniteConfiguration cfg = super.getConfiguration(gridName);
+ 
+         if (hasCache)
+             cfg.setCacheConfiguration(cache(null));
+         else
+             cfg.setCacheConfiguration();
+ 
+         cfg.setDiscoverySpi(discovery());
+         cfg.setMarshaller(new OptimizedMarshaller(false));
+ 
+         return cfg;
+     }
+ 
+     /**
+      * @return Distribution.
+      */
 -    protected CacheDistributionMode distributionMode() {
 -        return NEAR_PARTITIONED;
++    protected NearCacheConfiguration nearConfiguration() {
++        return new NearCacheConfiguration();
+     }
+ 
+     /**
+      * @param name Cache name.
+      * @return Cache.
+      */
+     private CacheConfiguration cache(@Nullable String name) {
+         CacheConfiguration<?,?> cache = defaultCacheConfiguration();
+ 
+         cache.setName(name);
+         cache.setCacheMode(cacheMode());
+         cache.setAtomicityMode(atomicityMode());
 -        cache.setDistributionMode(distributionMode());
++        cache.setNearConfiguration(nearConfiguration());
+         cache.setWriteSynchronizationMode(FULL_SYNC);
+         cache.setRebalanceMode(SYNC);
+         cache.setIndexedTypes(
+             String.class, Organization.class,
+             CacheAffinityKey.class, Person.class
+         );
+ 
+         if (cacheMode() == PARTITIONED)
+             cache.setBackups(1);
+ 
+         return cache;
+     }
+ 
+     /**
+      * @return Discovery SPI.
+      */
+     private static DiscoverySpi discovery() {
+         TcpDiscoverySpi spi = new TcpDiscoverySpi();
+ 
+         spi.setIpFinder(IP_FINDER);
+ 
+         return spi;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void beforeTestsStarted() throws Exception {
+         hasCache = true;
+ 
+         startGridsMultiThreaded(gridCount());
+ 
+         hasCache = false;
+ 
+         startGrid(gridCount());
+ 
+         GridCache<String, Organization> orgCache = ((IgniteKernal)grid(0)).cache(null);
+ 
+         assert orgCache != null;
+ 
+         assert orgCache.putx("o1", new Organization(1, "A"));
+         assert orgCache.putx("o2", new Organization(2, "B"));
+ 
+         GridCache<CacheAffinityKey<String>, Person> personCache = ((IgniteKernal)grid(0)).cache(null);
+ 
+         assert personCache != null;
+ 
+         assert personCache.putx(new CacheAffinityKey<>("p1", "o1"), new Person("John
White", 25, 1));
+         assert personCache.putx(new CacheAffinityKey<>("p2", "o1"), new Person("Joe
Black", 35, 1));
+         assert personCache.putx(new CacheAffinityKey<>("p3", "o2"), new Person("Mike
Green", 40, 2));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void afterTestsStopped() throws Exception {
+         stopAllGrids();
+     }
+ 
+     /**
+      * @return cache mode.
+      */
+     protected abstract CacheMode cacheMode();
+ 
+     /**
+      * @return Number of grids to start.
+      */
+     protected abstract int gridCount();
+ 
+     /**
+      * @return Cache atomicity mode.
+      */
+     protected CacheAtomicityMode atomicityMode() {
+         return TRANSACTIONAL;
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testNoDataInCache() throws Exception {
+         CacheQuery<List<?>> qry = ((IgniteKernal)grid(0))
+             .cache(null).queries().createSqlFieldsQuery("select age from Person where orgId
= 999");
+ 
+         Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new AverageRemoteReducer()).get();
+ 
+         assertEquals("Result", 0, F.reduce(res, new AverageLocalReducer()).intValue());
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testAverageQuery() throws Exception {
+         CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)).cache(null).queries().createSqlFieldsQuery("select
age from Person");
+ 
+         Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new AverageRemoteReducer()).get();
+ 
+         assertEquals("Average", 33, F.reduce(res, new AverageLocalReducer()).intValue());
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testAverageQueryWithArguments() throws Exception {
+         CacheQuery<List<?>> qry = ((IgniteKernal)grid(0)).cache(null).queries().createSqlFieldsQuery(
+             "select age from Person where orgId = ?");
+ 
+         Collection<IgniteBiTuple<Integer, Integer>> res = qry.execute(new AverageRemoteReducer(),
1).get();
+ 
+         assertEquals("Average", 30, F.reduce(res, new AverageLocalReducer()).intValue());
+     }
+ 
+ //    /**
+ //     * @throws Exception If failed.
+ //     */
+ //    public void testFilters() throws Exception {
+ //        GridCacheReduceFieldsQuery<Object, Object, GridBiTuple<Integer, Integer>,
Integer> qry = ((IgniteKernal)grid(0)).cache(null)
+ //            .queries().createReduceFieldsQuery("select age from Person");
+ //
+ //        qry = qry.remoteKeyFilter(
+ //            new GridPredicate<Object>() {
+ //                @Override public boolean apply(Object e) {
+ //                    return !"p2".equals(((CacheAffinityKey)e).key());
+ //                }
+ //            }
+ //        ).remoteValueFilter(
+ //            new P1<Object>() {
+ //                @Override public boolean apply(Object e) {
+ //                    return !"Mike Green".equals(((Person)e).name);
+ //                }
+ //            }
+ //        );
+ //
+ //        qry = qry.remoteReducer(new AverageRemoteReducer()).localReducer(new AverageLocalReducer());
+ //
+ //        Integer avg = qry.reduce().get();
+ //
+ //        assertNotNull("Average", avg);
+ //        assertEquals("Average", 25, avg.intValue());
+ //    }
+ 
+ //    /**
+ //     * @throws Exception If failed.
+ //     */
+ //    public void testOnProjectionWithFilter() throws Exception {
+ //        P2<CacheAffinityKey<String>, Person> p = new P2<CacheAffinityKey<String>,
Person>() {
+ //            @Override public boolean apply(CacheAffinityKey<String> key, Person
val) {
+ //                return val.orgId == 1;
+ //            }
+ //        };
+ //
+ //        CacheProjection<CacheAffinityKey<String>, Person> cachePrj =
+ //            grid(0).<CacheAffinityKey<String>, Person>cache(null).projection(p);
+ //
+ //        GridCacheReduceFieldsQuery<CacheAffinityKey<String>, Person, GridBiTuple<Integer,
Integer>, Integer> qry =
+ //            cachePrj.queries().createReduceFieldsQuery("select age from Person");
+ //
+ //        qry = qry.remoteValueFilter(
+ //            new P1<Person>() {
+ //                @Override public boolean apply(Person e) {
+ //                    return !"Joe Black".equals(e.name);
+ //                }
+ //            });
+ //
+ //        qry = qry.remoteReducer(new AverageRemoteReducer()).localReducer(new AverageLocalReducer());
+ //
+ //        Integer avg = qry.reduce().get();
+ //
+ //        assertNotNull("Average", avg);
+ //        assertEquals("Average", 25, avg.intValue());
+ //    }
+ 
+     /**
+      * @return true if cache mode is replicated, false otherwise.
+      */
+     private boolean isReplicatedMode() {
+         return cacheMode() == REPLICATED;
+     }
+ 
+     /**
+      * Person.
+      */
+     @SuppressWarnings("UnusedDeclaration")
+     private static class Person implements Serializable {
+         /** Name. */
+         @QuerySqlField(index = false)
+         private final String name;
+ 
+         /** Age. */
+         @QuerySqlField(index = true)
+         private final int age;
+ 
+         /** Organization ID. */
+         @QuerySqlField(index = true)
+         private final int orgId;
+ 
+         /**
+          * @param name Name.
+          * @param age Age.
+          * @param orgId Organization ID.
+          */
+         private Person(String name, int age, int orgId) {
+             assert !F.isEmpty(name);
+             assert age > 0;
+             assert orgId > 0;
+ 
+             this.name = name;
+             this.age = age;
+             this.orgId = orgId;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean equals(Object o) {
+             if (this == o)
+                 return true;
+ 
+             if (o == null || getClass() != o.getClass())
+                 return false;
+ 
+             Person person = (Person)o;
+ 
+             return age == person.age && orgId == person.orgId && name.equals(person.name);
+ 
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public int hashCode() {
+             int res = name.hashCode();
+ 
+             res = 31 * res + age;
+             res = 31 * res + orgId;
+ 
+             return res;
+         }
+     }
+ 
+     /**
+      * Organization.
+      */
+     @SuppressWarnings("UnusedDeclaration")
+     private static class Organization implements Serializable {
+         /** ID. */
+         @QuerySqlField
+         private final int id;
+ 
+         /** Name. */
+         @QuerySqlField(index = false)
+         private final String name;
+ 
+         /**
+          * @param id ID.
+          * @param name Name.
+          */
+         private Organization(int id, String name) {
+             assert id > 0;
+             assert !F.isEmpty(name);
+ 
+             this.id = id;
+             this.name = name;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean equals(Object o) {
+             if (this == o)
+                 return true;
+ 
+             if (o == null || getClass() != o.getClass())
+                 return false;
+ 
+             Organization that = (Organization)o;
+ 
+             return id == that.id && name.equals(that.name);
+ 
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public int hashCode() {
+             int res = id;
+ 
+             res = 31 * res + name.hashCode();
+ 
+             return res;
+         }
+     }
+ 
+     /**
+      * Average remote reducer factory.
+      */
+     protected static class AverageRemoteReducer implements IgniteReducer<List<?>,
IgniteBiTuple<Integer, Integer>> {
+         /** */
+         private int sum;
+ 
+         /** */
+         private int cnt;
+ 
+         @Override public boolean collect(List<?> e) {
+             sum += (Integer)e.get(0);
+ 
+             cnt++;
+ 
+             return true;
+         }
+ 
+         @Override public IgniteBiTuple<Integer, Integer> reduce() {
+             return F.t(sum, cnt);
+         }
+     }
+ 
+     /**
+      * Average local reducer factory.
+      */
+     protected static class AverageLocalReducer implements IgniteReducer<IgniteBiTuple<Integer,
Integer>, Integer> {
+         /** */
+         private int sum;
+ 
+         /** */
+         private int cnt;
+ 
+         @Override public boolean collect(IgniteBiTuple<Integer, Integer> t) {
+             sum += t.get1();
+             cnt += t.get2();
+ 
+             return true;
+         }
+ 
+         @Override public Integer reduce() {
+             return cnt == 0 ? 0 : sum / cnt;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryAtomicSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryAtomicSelfTest.java
index 0000000,fd93a3e..ee463b6
mode 000000,100644..100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryAtomicSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheReduceFieldsQueryAtomicSelfTest.java
@@@ -1,0 -1,38 +1,38 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.reducefields;
+ 
+ import org.apache.ignite.cache.*;
++import org.apache.ignite.configuration.*;
+ 
+ import static org.apache.ignite.cache.CacheAtomicityMode.*;
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
+ 
+ /**
+  * Reduce fields queries tests for partitioned cache.
+  */
+ public class GridCacheReduceFieldsQueryAtomicSelfTest extends GridCacheReduceFieldsQueryPartitionedSelfTest
{
+     /** {@inheritDoc} */
+     @Override protected CacheAtomicityMode atomicityMode() {
+         return ATOMIC;
+     }
+ 
+     /** {@inheritDoc} */
 -    @Override protected CacheDistributionMode distributionMode() {
 -        return PARTITIONED_ONLY;
++    @Override protected NearCacheConfiguration nearConfiguration() {
++        return null;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
index e3fc936,f7e50ad..ed729ad
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
@@@ -73,17 -70,15 +70,15 @@@ public class GridQueryParsingTest exten
  
          cc.setCacheMode(CacheMode.PARTITIONED);
          cc.setAtomicityMode(CacheAtomicityMode.ATOMIC);
 -        cc.setDistributionMode(PARTITIONED_ONLY);
 +        cc.setNearConfiguration(null);
          cc.setWriteSynchronizationMode(FULL_SYNC);
-         cc.setPreloadMode(SYNC);
+         cc.setRebalanceMode(SYNC);
          cc.setSwapEnabled(false);
- 
-         CacheQueryConfiguration qcfg = new CacheQueryConfiguration();
- 
-         qcfg.setIndexPrimitiveKey(true);
-         qcfg.setIndexFixedTyping(true);
- 
-         cc.setQueryConfiguration(qcfg);
+         cc.setSqlFunctionClasses(GridQueryParsingTest.class);
+         cc.setIndexedTypes(
+             String.class, Address.class,
+             String.class, Person.class
+         );
  
          c.setCacheConfiguration(cc);
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
----------------------------------------------------------------------


Mime
View raw message