ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject ignite git commit: IGNITE-2004 Added async query example.
Date Fri, 15 Apr 2016 11:27:57 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2004 126dc4b66 -> 7e68b9896


IGNITE-2004 Added async query example.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7e68b989
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7e68b989
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7e68b989

Branch: refs/heads/ignite-2004
Commit: 7e68b98960519274d2803c8bf325287fb93af5cd
Parents: 126dc4b
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Fri Apr 15 14:28:00 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Fri Apr 15 14:28:00 2016 +0300

----------------------------------------------------------------------
 .../CacheContinuousAsyncQueryExample.java       | 137 +++++++++++++++++++
 .../datagrid/CacheContinuousQueryExample.java   |  14 +-
 2 files changed, 148 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7e68b989/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java
b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java
new file mode 100644
index 0000000..32286e3
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datagrid;
+
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * This examples demonstrates asynchronous continuous query API.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
+ */
+public class CacheContinuousAsyncQueryExample {
+    /** Cache name. */
+    private static final String CACHE_NAME = CacheContinuousAsyncQueryExample.class.getSimpleName();
+
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     * @throws Exception If example execution failed.
+     */
+    public static void main(String[] args) throws Exception {
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println();
+            System.out.println(">>> Cache continuous query example started.");
+
+            // Auto-close cache at the end of the example.
+            try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME))
{
+                int keyCnt = 20;
+
+                // These entries will be queried by initial predicate.
+                for (int i = 0; i < keyCnt; i++)
+                    cache.put(i, Integer.toString(i));
+
+                // Create new continuous query.
+                ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
+
+                qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer,
String>() {
+                    @Override public boolean apply(Integer key, String val) {
+                        return key > 10;
+                    }
+                }));
+
+                // Callback that is called locally when update notifications are received.
+                qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>()
{
+                    @Override public void onUpdated(Iterable<CacheEntryEvent<? extends
Integer, ? extends String>> evts) {
+                        for (CacheEntryEvent<? extends Integer, ? extends String> e
: evts)
+                            System.out.println("Updated entry [key=" + e.getKey() + ", val="
+ e.getValue() + ']');
+                    }
+                });
+
+                // This filter will be evaluated remotely on all nodes.
+                // Entry that pass this filter will be sent to the caller.
+                qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer,
String>>() {
+                    @Override public CacheEntryEventFilter<Integer, String> create()
{
+                        return new CacheEntryFilter();
+                    }
+                });
+
+                // Execute query.
+                try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry))
{
+                    // Iterate through existing data.
+                    for (Cache.Entry<Integer, String> e : cur)
+                        System.out.println("Queried existing entry [key=" + e.getKey() +
", val=" + e.getValue() + ']');
+
+                    // Add a few more keys and watch more query notifications.
+                    for (int i = 0; i < keyCnt; i++)
+                        cache.put(i, Integer.toString(i));
+
+                    // Wait for a while while callback is notified about remaining puts.
+                    Thread.sleep(2000);
+                }
+
+                for (int i = 0; i < 10; i++)
+                    System.out.println("Entry updated from filter [key=" + i + ", val=" +
cache.get(i) + ']');
+            }
+            finally {
+                // Distributed cache could be removed from cluster only by #destroyCache()
call.
+                ignite.destroyCache(CACHE_NAME);
+            }
+        }
+    }
+
+    /**
+     * Filter returns {@code true} for entries which have key bigger than 10.
+     */
+    @IgniteAsyncCallback
+    private static class CacheEntryFilter implements CacheEntryEventFilter<Integer, String>
{
+        /** Ignite instance. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends
String> e)
+            throws CacheEntryListenerException {
+            // This cache operation is safe because filter has Ignite AsyncCallback annotation.
+            if (e.getKey() < 10 && String.valueOf(e.getKey()).equals(e.getValue()))
+                ignite.cache(CACHE_NAME).put(e.getKey(), e.getValue() + "_less_than_10");
+
+            return e.getKey() > 10;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e68b989/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
index 59759af..6db968d 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java
@@ -18,7 +18,10 @@
 package org.apache.ignite.examples.datagrid;
 
 import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
 import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -28,6 +31,7 @@ import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.examples.ExampleNodeStartup;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteBiPredicate;
 
 /**
@@ -81,9 +85,13 @@ public class CacheContinuousQueryExample {
 
                 // This filter will be evaluated remotely on all nodes.
                 // Entry that pass this filter will be sent to the caller.
-                qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer, String>()
{
-                    @Override public boolean evaluate(CacheEntryEvent<? extends Integer,
? extends String> e) {
-                        return e.getKey() > 10;
+                qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer,
String>>() {
+                    @Override public CacheEntryEventFilter<Integer, String> create()
{
+                        return new CacheEntryEventFilter<Integer, String>() {
+                            @Override public boolean evaluate(CacheEntryEvent<? extends
Integer, ? extends String> e) {
+                                return e.getKey() > 10;
+                            }
+                        };
                     }
                 });
 


Mime
View raw message