ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [1/6] incubator-ignite git commit: IGNITE-143 - Continuous queries refactoring (manual merge)
Date Sat, 14 Feb 2015 01:21:02 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-sql-old 3b8f9a64d -> b42fdcd24


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java
index f9c7ee2..5d0d84d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java
@@ -17,11 +17,12 @@
 
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
+import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.query.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.lang.*;
 
+import javax.cache.*;
+import javax.cache.event.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -46,40 +47,38 @@ public class GridCacheContinuousQueryReplicatedSelfTest extends GridCacheContinu
     /**
      * @throws Exception If failed.
      */
-    @SuppressWarnings("unchecked")
     public void testRemoteNodeCallback() throws Exception {
-        GridCache<Integer, Integer> cache1 = grid(0).cache(null);
+        IgniteCache<Integer, Integer> cache1 = grid(0).jcache(null);
+        IgniteCache<Integer, Integer> cache2 = grid(1).jcache(null);
 
-        GridCache<Integer, Integer> cache2 = grid(1).cache(null);
-
-        CacheContinuousQuery<Integer, Integer> qry = cache2.queries().createContinuousQuery();
+        ContinuousQuery<Integer, Integer> qry = Query.continuous();
 
         final AtomicReference<Integer> val = new AtomicReference<>();
         final CountDownLatch latch = new CountDownLatch(1);
 
-        qry.localCallback(new P2<UUID, Collection<CacheContinuousQueryEntry<Integer,
Integer>>>() {
-            @Override public boolean apply(UUID uuid, Collection<CacheContinuousQueryEntry<Integer,
Integer>> entries) {
-                assertEquals(1, entries.size());
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
+                Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>>
it = evts.iterator();
+
+                CacheEntryEvent<? extends Integer, ? extends Integer> e = it.next();
 
-                Map.Entry<Integer, Integer> e = entries.iterator().next();
+                assert !it.hasNext();
 
-                log.info("Entry: " + e);
+                log.info("Event: " + e);
 
                 val.set(e.getValue());
 
                 latch.countDown();
-
-                return false;
             }
         });
 
-        qry.execute();
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache2.query(qry))
{
+            cache1.put(1, 10);
 
-        cache1.put(1, 10);
+            latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
-        latch.await(LATCH_TIMEOUT, MILLISECONDS);
-
-        assertEquals(10, val.get().intValue());
+            assertEquals(10, val.get().intValue());
+        }
     }
 
     /**
@@ -87,61 +86,49 @@ public class GridCacheContinuousQueryReplicatedSelfTest extends GridCacheContinu
      *
      * @throws Exception If failed.
      */
-    @SuppressWarnings("unchecked")
     public void testCrossCallback() throws Exception {
         // Prepare.
-        GridCache<Integer, Integer> cache1 = grid(0).cache(null);
-        GridCache<Integer, Integer> cache2 = grid(1).cache(null);
+        IgniteCache<Integer, Integer> cache1 = grid(0).jcache(null);
+        IgniteCache<Integer, Integer> cache2 = grid(1).jcache(null);
 
-        final int key1 = primaryKey(jcache(0));
-        final int key2 = primaryKey(jcache(1));
+        final int key1 = primaryKey(cache1);
+        final int key2 = primaryKey(cache2);
 
         final CountDownLatch latch1 = new CountDownLatch(2);
         final CountDownLatch latch2 = new CountDownLatch(2);
 
+        ContinuousQuery<Integer, Integer> qry1 = Query.continuous();
 
-        // Start query on the first node.
-        CacheContinuousQuery<Integer, Integer> qry1 = cache1.queries().createContinuousQuery();
-
-        qry1.localCallback(new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer,
Integer>>>() {
-            @Override public boolean apply(UUID nodeID,
-                Collection<CacheContinuousQueryEntry<Integer, Integer>> entries)
{
-                for (CacheContinuousQueryEntry entry : entries) {
-                    log.info("Update in cache 1: " + entry);
+        qry1.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
{
+                    log.info("Update in cache 1: " + evt);
 
-                    if (entry.getKey() == key1 || entry.getKey() == key2)
-                        latch1.countDown();
+                    if (evt.getKey() == key1 || evt.getKey() == key2) latch1.countDown();
                 }
-
-                return latch1.getCount() != 0;
             }
         });
 
-        qry1.execute();
-
-        // Start query on the second node.
-        CacheContinuousQuery<Integer, Integer> qry2 = cache2.queries().createContinuousQuery();
+        ContinuousQuery<Integer, Integer> qry2 = Query.continuous();
 
-        qry2.localCallback(new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer,
Integer>>>() {
-            @Override public boolean apply(UUID nodeID,
-                Collection<CacheContinuousQueryEntry<Integer, Integer>> entries)
{
-                for (CacheContinuousQueryEntry entry : entries) {
-                    log.info("Update in cache 2: " + entry);
+        qry2.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts)
{
+                    log.info("Update in cache 2: " + evt);
 
-                    if (entry.getKey() == key1 || entry.getKey() == key2)
+                    if (evt.getKey() == key1 || evt.getKey() == key2)
                         latch2.countDown();
                 }
-
-                return latch2.getCount() != 0;
             }
         });
 
-        qry2.execute();
-
-        cache1.put(key1, key1);
-        cache1.put(key2, key2);
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache2.query(qry1);
+             QueryCursor<Cache.Entry<Integer, Integer>> ignore = cache2.query(qry2))
{
+            cache1.put(key1, key1);
+            cache1.put(key2, key2);
 
-        assert latch1.await(LATCH_TIMEOUT, MILLISECONDS);
-        assert latch2.await(LATCH_TIMEOUT, MILLISECONDS);
+            assert latch1.await(LATCH_TIMEOUT, MILLISECONDS);
+            assert latch2.await(LATCH_TIMEOUT, MILLISECONDS);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java
b/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java
index d06d53f..6fe1cc5 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java
@@ -18,18 +18,18 @@
 package org.apache.ignite.loadtests.continuous;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.continuous.*;
-import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -79,17 +79,17 @@ public class GridContinuousOperationsLoadTest {
         dumpProperties(System.out);
 
         try (Ignite ignite = Ignition.start(cfgPath)) {
-            final GridCache<Object, Object> cache = ignite.cache(cacheName);
+            final IgniteCache<Object, Object> cache = ignite.jcache(cacheName);
 
             if (cache == null)
                 throw new IgniteCheckedException("Cache is not configured: " + cacheName);
 
             // Continuous query manager, used to monitor queue size.
-            final GridCacheContinuousQueryManager contQryMgr =
+            final CacheContinuousQueryManager contQryMgr =
                 ((GridCacheAdapter)((GridCacheProxyImpl)cache).cache()).context().continuousQueries();
 
             if (contQryMgr == null)
-                throw new IgniteCheckedException("Could not access GridCacheContinuousQueryManager");
+                throw new IgniteCheckedException("Could not access CacheContinuousQueryManager");
 
             final AtomicBoolean stop = new AtomicBoolean(); // Stop flag.
             final AtomicLong cbCntr = new AtomicLong();     // Callback counter.
@@ -97,33 +97,43 @@ public class GridContinuousOperationsLoadTest {
 
             for (int i = 0; i < parallelCnt; i++) {
                 if (useQry) {
-                    CacheContinuousQuery<Object, Object> qry = cache.queries().createContinuousQuery();
+                    ContinuousQuery<Object, Object> qry = Query.continuous();
 
-                    qry.localCallback(new PX2<UUID, Collection<CacheContinuousQueryEntry<Object,
Object>>>() {
-                        @Override public boolean applyx(UUID uuid, Collection<CacheContinuousQueryEntry<Object,
Object>> entries)
-                            throws IgniteInterruptedCheckedException {
-                            if (cbSleepMs > 0)
-                                U.sleep(cbSleepMs);
-
-                            cbCntr.addAndGet(entries.size());
+                    qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>()
{
+                        @Override public void onUpdated(Iterable<CacheEntryEvent<?,
?>> evts) {
+                            if (cbSleepMs > 0) {
+                                try {
+                                    U.sleep(cbSleepMs);
+                                }
+                                catch (IgniteInterruptedCheckedException e) {
+                                    throw new IgniteException(e);
+                                }
+                            }
 
-                            return true; // Continue listening.
+                            for (CacheEntryEvent<?, ?> ignored : evts)
+                                cbCntr.incrementAndGet();
                         }
                     });
 
-                    qry.remoteFilter(new IgnitePredicateX<CacheContinuousQueryEntry<Object,
Object>>() {
-                        @Override public boolean applyx(CacheContinuousQueryEntry e) throws
IgniteInterruptedCheckedException {
-                            if (filterSleepMs > 0)
-                                U.sleep(filterSleepMs);
+                    qry.setRemoteFilter(new CacheEntryEventFilter<Object, Object>()
{
+                        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt)
{
+                            if (filterSleepMs > 0) {
+                                try {
+                                    U.sleep(filterSleepMs);
+                                }
+                                catch (IgniteInterruptedCheckedException e) {
+                                    throw new IgniteException(e);
+                                }
+                            }
 
                             return Math.random() * 100 >= filterSkipProb;
                         }
                     });
 
-                    qry.bufferSize(bufSize);
-                    qry.timeInterval(timeInterval);
+                    qry.setBufferSize(bufSize);
+                    qry.setTimeInterval(timeInterval);
 
-                    qry.execute();
+                    cache.query(qry);
                 }
                 else {
                     ignite.events().remoteListen(
@@ -188,7 +198,7 @@ public class GridContinuousOperationsLoadTest {
                     while (!stop.get() && !Thread.currentThread().isInterrupted())
{
                         Integer key = rnd.nextInt(keyRange);
 
-                        cache.putx(key, val);
+                        cache.put(key, val);
 
                         updCntr.incrementAndGet();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 884ceca..8ba8966 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -64,7 +64,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K,
V> {
                 new CacheConfiguration()),
             new GridCacheEvictionManager<K, V>(),
             new GridCacheLocalQueryManager<K, V>(),
-            new GridCacheContinuousQueryManager<K, V>(),
+            new CacheContinuousQueryManager<K, V>(),
             new GridCacheAffinityManager<K, V>(),
             new CacheDataStructuresManager<K, V>(),
             new GridCacheTtlManager<K, V>(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f649be2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
index 3af9835..6ec4058 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
@@ -19,8 +19,8 @@ package org.apache.ignite.internal.processors.hadoop.jobtracker;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.query.*;
 import org.apache.ignite.events.*;
+import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.cache.*;
@@ -36,6 +36,7 @@ import org.apache.ignite.lang.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.event.*;
 import javax.cache.expiry.*;
 import javax.cache.processor.*;
 import java.io.*;
@@ -135,8 +136,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
                         ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy(
                             new Duration(MILLISECONDS, ctx.configuration().getFinishedJobInfoTtl()));
 
-                        finishedJobMetaPrj = ((GridCacheProjectionEx<GridHadoopJobId,
GridHadoopJobMetadata>)prj).
-                            withExpiryPolicy(finishedJobPlc);
+                        finishedJobMetaPrj = prj.withExpiryPolicy(finishedJobPlc);
                     }
                     else
                         finishedJobMetaPrj = jobMetaPrj;
@@ -169,14 +169,12 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
     @Override public void onKernalStart() throws IgniteCheckedException {
         super.onKernalStart();
 
-        CacheContinuousQuery<GridHadoopJobId, GridHadoopJobMetadata> qry = jobMetaCache().queries().createContinuousQuery();
-
-        qry.localCallback(
-            new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<GridHadoopJobId,
GridHadoopJobMetadata>>>() {
-                @Override public boolean apply(UUID nodeId,
-                    final Collection<CacheContinuousQueryEntry<GridHadoopJobId, GridHadoopJobMetadata>>
evts) {
+        jobMetaCache().context().continuousQueries().executeInternalQuery(
+            new CacheEntryUpdatedListener<GridHadoopJobId, GridHadoopJobMetadata>()
{
+                @Override public void onUpdated(final Iterable<CacheEntryEvent<? extends
GridHadoopJobId,
+                    ? extends GridHadoopJobMetadata>> evts) {
                     if (!busyLock.tryReadLock())
-                        return false;
+                        return;
 
                     try {
                         // Must process query callback in a separate thread to avoid deadlocks.
@@ -185,16 +183,15 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
                                 processJobMetadataUpdates(evts);
                             }
                         });
-
-                        return true;
                     }
                     finally {
                         busyLock.readUnlock();
                     }
                 }
-            });
-
-        qry.execute(ctx.kernalContext().grid().forLocal());
+            },
+            null,
+            true
+        );
 
         ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() {
             @Override public void onEvent(final Event evt) {
@@ -629,11 +626,11 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
      * @throws IgniteCheckedException If failed.
      */
     private void processJobMetadataUpdates(
-        Iterable<CacheContinuousQueryEntry<GridHadoopJobId, GridHadoopJobMetadata>>
updated)
+        Iterable<CacheEntryEvent<? extends GridHadoopJobId, ? extends GridHadoopJobMetadata>>
updated)
         throws IgniteCheckedException {
         UUID locNodeId = ctx.localNodeId();
 
-        for (Map.Entry<GridHadoopJobId, GridHadoopJobMetadata> entry : updated) {
+        for (CacheEntryEvent<? extends GridHadoopJobId, ? extends GridHadoopJobMetadata>
entry : updated) {
             GridHadoopJobId jobId = entry.getKey();
             GridHadoopJobMetadata meta = entry.getValue();
 


Mime
View raw message