ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: IGNITE-1312: Moved continuous queries to Ignite.
Date Fri, 28 Aug 2015 06:23:45 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 3e30c863b -> f4c7107ce


IGNITE-1312: Moved continuous queries to Ignite.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f4c7107c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f4c7107c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f4c7107c

Branch: refs/heads/master
Commit: f4c7107ce90f54f6cff7cd83d18b025011b2c7cf
Parents: 3e30c86
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Fri Aug 28 09:24:25 2015 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Fri Aug 28 09:24:25 2015 +0300

----------------------------------------------------------------------
 .../processors/platform/PlatformContext.java    |  19 ++
 .../processors/platform/PlatformTarget.java     |  76 +++++++
 .../cache/query/PlatformContinuousQuery.java    |  58 +++++
 .../processors/platform/PlatformTarget.java     |  76 -------
 .../query/PlatformContinuousQueryImpl.java      | 222 +++++++++++++++++++
 .../PlatformContinuousQueryRemoteFilter.java    | 183 +++++++++++++++
 6 files changed, 558 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c7107c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index 504f79e..461fb84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.platform;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.cache.query.continuous.*;
+import org.apache.ignite.internal.processors.platform.cache.query.*;
 import org.apache.ignite.internal.processors.platform.callback.*;
 import org.apache.ignite.internal.processors.platform.memory.*;
 import org.jetbrains.annotations.*;
@@ -135,4 +137,21 @@ public interface PlatformContext {
      * @param metrics Metrics.
      */
     public void writeClusterMetrics(PortableRawWriterEx writer, @Nullable ClusterMetrics
metrics);
+
+    /**
+     *
+     * @param ptr Pointer to continuous query deployed on the platform.
+     * @param hasFilter Whether filter exists.
+     * @param filter Filter.
+     * @return Platform continuous query.
+     */
+    public PlatformContinuousQuery createContinuousQuery(long ptr, boolean hasFilter, @Nullable
Object filter);
+
+    /**
+     * Create continuous query filter to be deployed on remote node.
+     *
+     * @param filter Native filter.
+     * @return Filter.
+     */
+    public CacheContinuousQueryFilterEx createContinuousQueryFilter(Object filter);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c7107c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
new file mode 100644
index 0000000..1d54b4e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform;
+
+import org.jetbrains.annotations.*;
+
+/**
+ * Interop target abstraction.
+ */
+@SuppressWarnings("UnusedDeclaration")
+public interface PlatformTarget {
+    /**
+     * Synchronous IN operation.
+     *
+     * @param type Operation type.
+     * @param memPtr Memory pointer.
+     * @return Value specific for the given operation otherwise.
+     * @throws Exception If failed.
+     */
+    public int inOp(int type, long memPtr) throws Exception;
+
+    /**
+     * Synchronous IN operation which returns managed object as result.
+     *
+     * @param type Operation type.
+     * @param memPtr Memory pointer.
+     * @return Managed result.
+     * @throws Exception If case of failure.
+     */
+    public Object inOpObject(int type, long memPtr) throws Exception;
+
+    /**
+     * Synchronous OUT operation.
+     *
+     * @param type Operation type.
+     * @param memPtr Memory pointer.
+     * @throws Exception In case of failure.
+     */
+    public void outOp(int type, long memPtr) throws Exception;
+
+    /**
+     * Synchronous IN-OUT operation.
+     *
+     * @param type Operation type.
+     * @param inMemPtr Input memory pointer.
+     * @param outMemPtr Output memory pointer.
+     * @throws Exception In case of failure.
+     */
+    public void inOutOp(int type, long inMemPtr, long outMemPtr) throws Exception;
+
+    /**
+     * Synchronous IN-OUT operation with optional argument.
+     *
+     * @param type Operation type.
+     * @param inMemPtr Input memory pointer.
+     * @param outMemPtr Output memory pointer.
+     * @param arg Argument (optional).
+     * @throws Exception In case of failure.
+     */
+    public void inOutOp(int type, long inMemPtr, long outMemPtr, @Nullable Object arg) throws
Exception;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c7107c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQuery.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQuery.java
new file mode 100644
index 0000000..0b55aea
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQuery.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.query.continuous.*;
+import org.apache.ignite.internal.processors.platform.*;
+
+import javax.cache.event.*;
+
+/**
+ * Platform continuous query.
+ */
+public interface PlatformContinuousQuery extends CacheEntryUpdatedListener, CacheContinuousQueryFilterEx
{
+    /**
+     * Start continuous query execution.
+     *
+     * @param cache Cache.
+     * @param loc Local flag.
+     * @param bufSize Buffer size.
+     * @param timeInterval Time interval.
+     * @param autoUnsubscribe Auto-unsubscribe flag.
+     * @param initialQry Initial query.
+     * @throws org.apache.ignite.IgniteCheckedException If failed.
+     */
+    public void start(IgniteCacheProxy cache, boolean loc, int bufSize, long timeInterval,
boolean autoUnsubscribe,
+        Query initialQry) throws IgniteCheckedException;
+
+    /**
+     * Close continuous query.
+     */
+    public void close();
+
+    /**
+     * Gets initial query cursor (if any).
+     *
+     * @return Initial query cursor.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public PlatformTarget getInitialQueryCursor();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c7107c/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
deleted file mode 100644
index 1d54b4e..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform;
-
-import org.jetbrains.annotations.*;
-
-/**
- * Interop target abstraction.
- */
-@SuppressWarnings("UnusedDeclaration")
-public interface PlatformTarget {
-    /**
-     * Synchronous IN operation.
-     *
-     * @param type Operation type.
-     * @param memPtr Memory pointer.
-     * @return Value specific for the given operation otherwise.
-     * @throws Exception If failed.
-     */
-    public int inOp(int type, long memPtr) throws Exception;
-
-    /**
-     * Synchronous IN operation which returns managed object as result.
-     *
-     * @param type Operation type.
-     * @param memPtr Memory pointer.
-     * @return Managed result.
-     * @throws Exception If case of failure.
-     */
-    public Object inOpObject(int type, long memPtr) throws Exception;
-
-    /**
-     * Synchronous OUT operation.
-     *
-     * @param type Operation type.
-     * @param memPtr Memory pointer.
-     * @throws Exception In case of failure.
-     */
-    public void outOp(int type, long memPtr) throws Exception;
-
-    /**
-     * Synchronous IN-OUT operation.
-     *
-     * @param type Operation type.
-     * @param inMemPtr Input memory pointer.
-     * @param outMemPtr Output memory pointer.
-     * @throws Exception In case of failure.
-     */
-    public void inOutOp(int type, long inMemPtr, long outMemPtr) throws Exception;
-
-    /**
-     * Synchronous IN-OUT operation with optional argument.
-     *
-     * @param type Operation type.
-     * @param inMemPtr Input memory pointer.
-     * @param outMemPtr Output memory pointer.
-     * @param arg Argument (optional).
-     * @throws Exception In case of failure.
-     */
-    public void inOutOp(int type, long inMemPtr, long outMemPtr, @Nullable Object arg) throws
Exception;
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c7107c/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
new file mode 100644
index 0000000..b2fa1e3
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
@@ -0,0 +1,222 @@
+/*
+ *  Copyright (C) GridGain Systems. All Rights Reserved.
+ *  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.processors.query.*;
+
+import javax.cache.*;
+import javax.cache.event.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Interop continuous query handle.
+ */
+public class PlatformContinuousQueryImpl implements PlatformContinuousQuery {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Context. */
+    protected final PlatformContext platformCtx;
+
+    /** Whether filter exists. */
+    private final boolean hasFilter;
+
+    /** Native filter in serialized form. If null, then filter is either not set, or this
is local query. */
+    protected final Object filter;
+
+    /** Pointer to native counterpart; zero if closed. */
+    private long ptr;
+
+    /** Cursor to handle filter close. */
+    private QueryCursor cursor;
+
+    /** Lock for concurrency control. */
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /** Wrapped initial qry cursor. */
+    private PlatformQueryCursor initialQryCur;
+
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Context.
+     * @param ptr Pointer to native counterpart.
+     * @param hasFilter Whether filter exists.
+     * @param filter Filter.
+     */
+    public PlatformContinuousQueryImpl(PlatformContext platformCtx, long ptr, boolean hasFilter,
Object filter) {
+        assert ptr != 0L;
+
+        this.platformCtx = platformCtx;
+        this.ptr = ptr;
+        this.hasFilter = hasFilter;
+        this.filter = filter;
+    }
+
+    /**
+     * Start query execution.
+     *
+     * @param cache Cache.
+     * @param loc Local flag.
+     * @param bufSize Buffer size.
+     * @param timeInterval Time interval.
+     * @param autoUnsubscribe Auto-unsubscribe flag.
+     * @param initialQry Initial query.
+     */
+    @SuppressWarnings("unchecked")
+    public void start(IgniteCacheProxy cache, boolean loc, int bufSize, long timeInterval,
boolean autoUnsubscribe,
+        Query initialQry) throws IgniteCheckedException {
+        assert !loc || filter == null;
+
+        lock.writeLock().lock();
+
+        try {
+            try {
+                ContinuousQuery qry = new ContinuousQuery();
+
+                qry.setLocalListener(this);
+                qry.setRemoteFilter(this); // Filter must be set always for correct resource
release.
+                qry.setPageSize(bufSize);
+                qry.setTimeInterval(timeInterval);
+                qry.setAutoUnsubscribe(autoUnsubscribe);
+                qry.setInitialQuery(initialQry);
+
+                cursor = cache.query(qry.setLocal(loc));
+
+                if (initialQry != null)
+                    initialQryCur = new PlatformQueryCursor(platformCtx, new QueryCursorEx<Cache.Entry>()
{
+                        @Override public Iterator<Cache.Entry> iterator() {
+                            return cursor.iterator();
+                        }
+
+                        @Override public List<Cache.Entry> getAll() {
+                            return cursor.getAll();
+                        }
+
+                        @Override public void close() {
+                            // No-op: do not close whole continuous query when initial query
cursor closes.
+                        }
+
+                        @Override public void getAll(Consumer<Cache.Entry> clo) throws
IgniteCheckedException {
+                            for (Cache.Entry t : this)
+                                clo.consume(t);
+                        }
+
+                        @Override public List<GridQueryFieldMetadata> fieldsMeta()
{
+                            return null;
+                        }
+                    }, initialQry.getPageSize() > 0 ? initialQry.getPageSize() : Query.DFLT_PAGE_SIZE);
+            }
+            catch (Exception e) {
+                try
+                {
+                    close0();
+                }
+                catch (Exception ignored)
+                {
+                    // Ignore
+                }
+
+                throw PlatformUtils.unwrapQueryException(e);
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void onUpdated(Iterable evts) throws CacheEntryListenerException {
+        lock.readLock().lock();
+
+        try {
+            if (ptr == 0)
+                throw new CacheEntryListenerException("Failed to notify listener because
it has been closed.");
+
+            PlatformUtils.applyContinuousQueryEvents(platformCtx, ptr, evts);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException
{
+        lock.readLock().lock();
+
+        try {
+            if (ptr == 0)
+                throw new CacheEntryListenerException("Failed to evaluate the filter because
it has been closed.");
+
+            return !hasFilter || PlatformUtils.evaluateContinuousQueryEvent(platformCtx,
ptr, evt);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onQueryUnregister() {
+        close();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        lock.writeLock().lock();
+
+        try {
+            close0();
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"UnusedDeclaration", "unchecked"})
+    @Override public PlatformTarget getInitialQueryCursor() {
+        return initialQryCur;
+    }
+
+    /**
+     * Internal close routine.
+     */
+    private void close0() {
+        if (ptr != 0) {
+            long ptr0 = ptr;
+
+            ptr = 0;
+
+            if (cursor != null)
+                cursor.close();
+
+            platformCtx.gateway().continuousQueryFilterRelease(ptr0);
+        }
+    }
+
+    /**
+     * Replacer for remote filter.
+     *
+     * @return Filter to be deployed on remote node.
+     * @throws ObjectStreamException If failed.
+     */
+    Object writeReplace() throws ObjectStreamException {
+        return filter == null ? null : platformCtx.createContinuousQueryFilter(filter);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c7107c/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java
b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java
new file mode 100644
index 0000000..0f19218
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.cache.query.continuous.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.resources.*;
+
+import javax.cache.event.*;
+import java.io.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Continuous query filter deployed on remote nodes.
+ */
+public class PlatformContinuousQueryRemoteFilter implements CacheContinuousQueryFilterEx,
Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Lock for concurrency control. */
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /** Native filter in serialized form. */
+    private Object filter;
+
+    /** Grid hosting the filter. */
+    @IgniteInstanceResource
+    private transient Ignite grid;
+
+    /** Native platform pointer. */
+    private transient volatile long ptr;
+
+    /** Close flag. Once set, none requests to native platform is possible. */
+    private transient boolean closed;
+
+    /**
+     * {@link java.io.Externalizable} support.
+     */
+    public PlatformContinuousQueryRemoteFilter() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param filter Serialized native filter.
+     */
+    public PlatformContinuousQueryRemoteFilter(Object filter) {
+        assert filter != null;
+
+        this.filter = filter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException
{
+        long ptr0 = ptr;
+
+        if (ptr0 == 0)
+            deploy();
+
+        lock.readLock().lock();
+
+        try {
+            if (closed)
+                throw new CacheEntryListenerException("Failed to evaluate the filter because
it has been closed.");
+
+            PlatformContext platformCtx = PlatformUtils.platformContext(grid);
+
+            return PlatformUtils.evaluateContinuousQueryEvent(platformCtx, ptr, evt);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Deploy filter to native platform.
+     */
+    private void deploy() {
+        lock.writeLock().lock();
+
+        try {
+            // 1. Do not deploy if the filter has been closed concurrently.
+            if (closed)
+                throw new CacheEntryListenerException("Failed to deploy the filter because
it has been closed.");
+
+            // 2. Deploy.
+            PlatformContext ctx = PlatformUtils.platformContext(grid);
+
+            try (PlatformMemory mem = ctx.memory().allocate()) {
+                PlatformOutputStream out = mem.output();
+
+                PortableRawWriterEx writer = ctx.writer(out);
+
+                writer.writeObject(filter);
+
+                out.synchronize();
+
+                ptr = ctx.gateway().continuousQueryFilterCreate(mem.pointer());
+            }
+            catch (Exception e) {
+                // 3. Close in case of failure.
+                close();
+
+                throw new CacheEntryListenerException("Failed to deploy the filter.", e);
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onQueryUnregister() {
+        lock.writeLock().lock();
+
+        try {
+            close();
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Close the filter.
+     */
+    private void close() {
+        if (!closed) {
+            try {
+                if (ptr != 0) {
+                    try {
+                        PlatformUtils.platformContext(grid).gateway().continuousQueryFilterRelease(ptr);
+                    }
+                    finally {
+                        // Nullify the pointer in any case.
+                        ptr = 0;
+                    }
+                }
+            }
+            finally {
+                closed = true;
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+        filter = in.readObject();
+
+        assert filter != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(PlatformContinuousQueryRemoteFilter.class, this);
+    }
+}


Mime
View raw message