Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8AF0C1883F for ; Fri, 28 Aug 2015 14:01:38 +0000 (UTC) Received: (qmail 61599 invoked by uid 500); 28 Aug 2015 14:01:33 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 61517 invoked by uid 500); 28 Aug 2015 14:01:33 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 60671 invoked by uid 99); 28 Aug 2015 14:01:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Aug 2015 14:01:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 04559E7EB7; Fri, 28 Aug 2015 14:01:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Fri, 28 Aug 2015 14:01:54 -0000 Message-Id: <4dea1aa0989046f99065df145eff645c@git.apache.org> In-Reply-To: <8264eff31b714868ae9fd71b399b845f@git.apache.org> References: <8264eff31b714868ae9fd71b399b845f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [23/33] ignite git commit: IGNITE-1312: Moved continuous queries to Ignite. IGNITE-1312: Moved continuous queries to Ignite. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f4c7107c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f4c7107c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f4c7107c Branch: refs/heads/ignite-1093 Commit: f4c7107ce90f54f6cff7cd83d18b025011b2c7cf Parents: 3e30c86 Author: vozerov-gridgain Authored: Fri Aug 28 09:24:25 2015 +0300 Committer: vozerov-gridgain Committed: Fri Aug 28 09:24:25 2015 +0300 ---------------------------------------------------------------------- .../processors/platform/PlatformContext.java | 19 ++ .../processors/platform/PlatformTarget.java | 76 +++++++ .../cache/query/PlatformContinuousQuery.java | 58 +++++ .../processors/platform/PlatformTarget.java | 76 ------- .../query/PlatformContinuousQueryImpl.java | 222 +++++++++++++++++++ .../PlatformContinuousQueryRemoteFilter.java | 183 +++++++++++++++ 6 files changed, 558 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c7107c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java index 504f79e..461fb84 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.platform; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.cache.query.continuous.*; +import org.apache.ignite.internal.processors.platform.cache.query.*; import org.apache.ignite.internal.processors.platform.callback.*; import org.apache.ignite.internal.processors.platform.memory.*; import org.jetbrains.annotations.*; @@ -135,4 +137,21 @@ public interface PlatformContext { * @param metrics Metrics. */ public void writeClusterMetrics(PortableRawWriterEx writer, @Nullable ClusterMetrics metrics); + + /** + * + * @param ptr Pointer to continuous query deployed on the platform. + * @param hasFilter Whether filter exists. + * @param filter Filter. + * @return Platform continuous query. + */ + public PlatformContinuousQuery createContinuousQuery(long ptr, boolean hasFilter, @Nullable Object filter); + + /** + * Create continuous query filter to be deployed on remote node. + * + * @param filter Native filter. + * @return Filter. + */ + public CacheContinuousQueryFilterEx createContinuousQueryFilter(Object filter); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c7107c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java new file mode 100644 index 0000000..1d54b4e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.jetbrains.annotations.*; + +/** + * Interop target abstraction. + */ +@SuppressWarnings("UnusedDeclaration") +public interface PlatformTarget { + /** + * Synchronous IN operation. + * + * @param type Operation type. + * @param memPtr Memory pointer. + * @return Value specific for the given operation otherwise. + * @throws Exception If failed. + */ + public int inOp(int type, long memPtr) throws Exception; + + /** + * Synchronous IN operation which returns managed object as result. + * + * @param type Operation type. + * @param memPtr Memory pointer. + * @return Managed result. + * @throws Exception If case of failure. + */ + public Object inOpObject(int type, long memPtr) throws Exception; + + /** + * Synchronous OUT operation. + * + * @param type Operation type. + * @param memPtr Memory pointer. + * @throws Exception In case of failure. + */ + public void outOp(int type, long memPtr) throws Exception; + + /** + * Synchronous IN-OUT operation. + * + * @param type Operation type. + * @param inMemPtr Input memory pointer. + * @param outMemPtr Output memory pointer. + * @throws Exception In case of failure. + */ + public void inOutOp(int type, long inMemPtr, long outMemPtr) throws Exception; + + /** + * Synchronous IN-OUT operation with optional argument. + * + * @param type Operation type. + * @param inMemPtr Input memory pointer. + * @param outMemPtr Output memory pointer. + * @param arg Argument (optional). + * @throws Exception In case of failure. + */ + public void inOutOp(int type, long inMemPtr, long outMemPtr, @Nullable Object arg) throws Exception; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c7107c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQuery.java new file mode 100644 index 0000000..0b55aea --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQuery.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.query; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.query.continuous.*; +import org.apache.ignite.internal.processors.platform.*; + +import javax.cache.event.*; + +/** + * Platform continuous query. + */ +public interface PlatformContinuousQuery extends CacheEntryUpdatedListener, CacheContinuousQueryFilterEx { + /** + * Start continuous query execution. + * + * @param cache Cache. + * @param loc Local flag. + * @param bufSize Buffer size. + * @param timeInterval Time interval. + * @param autoUnsubscribe Auto-unsubscribe flag. + * @param initialQry Initial query. + * @throws org.apache.ignite.IgniteCheckedException If failed. + */ + public void start(IgniteCacheProxy cache, boolean loc, int bufSize, long timeInterval, boolean autoUnsubscribe, + Query initialQry) throws IgniteCheckedException; + + /** + * Close continuous query. + */ + public void close(); + + /** + * Gets initial query cursor (if any). + * + * @return Initial query cursor. + */ + @SuppressWarnings("UnusedDeclaration") + public PlatformTarget getInitialQueryCursor(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c7107c/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java deleted file mode 100644 index 1d54b4e..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.platform; - -import org.jetbrains.annotations.*; - -/** - * Interop target abstraction. - */ -@SuppressWarnings("UnusedDeclaration") -public interface PlatformTarget { - /** - * Synchronous IN operation. - * - * @param type Operation type. - * @param memPtr Memory pointer. - * @return Value specific for the given operation otherwise. - * @throws Exception If failed. - */ - public int inOp(int type, long memPtr) throws Exception; - - /** - * Synchronous IN operation which returns managed object as result. - * - * @param type Operation type. - * @param memPtr Memory pointer. - * @return Managed result. - * @throws Exception If case of failure. - */ - public Object inOpObject(int type, long memPtr) throws Exception; - - /** - * Synchronous OUT operation. - * - * @param type Operation type. - * @param memPtr Memory pointer. - * @throws Exception In case of failure. - */ - public void outOp(int type, long memPtr) throws Exception; - - /** - * Synchronous IN-OUT operation. - * - * @param type Operation type. - * @param inMemPtr Input memory pointer. - * @param outMemPtr Output memory pointer. - * @throws Exception In case of failure. - */ - public void inOutOp(int type, long inMemPtr, long outMemPtr) throws Exception; - - /** - * Synchronous IN-OUT operation with optional argument. - * - * @param type Operation type. - * @param inMemPtr Input memory pointer. - * @param outMemPtr Output memory pointer. - * @param arg Argument (optional). - * @throws Exception In case of failure. - */ - public void inOutOp(int type, long inMemPtr, long outMemPtr, @Nullable Object arg) throws Exception; -} http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c7107c/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java new file mode 100644 index 0000000..b2fa1e3 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java @@ -0,0 +1,222 @@ +/* + * Copyright (C) GridGain Systems. All Rights Reserved. + * _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.platform.cache.query; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.utils.*; +import org.apache.ignite.internal.processors.query.*; + +import javax.cache.*; +import javax.cache.event.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.locks.*; + +/** + * Interop continuous query handle. + */ +public class PlatformContinuousQueryImpl implements PlatformContinuousQuery { + /** */ + private static final long serialVersionUID = 0L; + + /** Context. */ + protected final PlatformContext platformCtx; + + /** Whether filter exists. */ + private final boolean hasFilter; + + /** Native filter in serialized form. If null, then filter is either not set, or this is local query. */ + protected final Object filter; + + /** Pointer to native counterpart; zero if closed. */ + private long ptr; + + /** Cursor to handle filter close. */ + private QueryCursor cursor; + + /** Lock for concurrency control. */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** Wrapped initial qry cursor. */ + private PlatformQueryCursor initialQryCur; + + /** + * Constructor. + * + * @param platformCtx Context. + * @param ptr Pointer to native counterpart. + * @param hasFilter Whether filter exists. + * @param filter Filter. + */ + public PlatformContinuousQueryImpl(PlatformContext platformCtx, long ptr, boolean hasFilter, Object filter) { + assert ptr != 0L; + + this.platformCtx = platformCtx; + this.ptr = ptr; + this.hasFilter = hasFilter; + this.filter = filter; + } + + /** + * Start query execution. + * + * @param cache Cache. + * @param loc Local flag. + * @param bufSize Buffer size. + * @param timeInterval Time interval. + * @param autoUnsubscribe Auto-unsubscribe flag. + * @param initialQry Initial query. + */ + @SuppressWarnings("unchecked") + public void start(IgniteCacheProxy cache, boolean loc, int bufSize, long timeInterval, boolean autoUnsubscribe, + Query initialQry) throws IgniteCheckedException { + assert !loc || filter == null; + + lock.writeLock().lock(); + + try { + try { + ContinuousQuery qry = new ContinuousQuery(); + + qry.setLocalListener(this); + qry.setRemoteFilter(this); // Filter must be set always for correct resource release. + qry.setPageSize(bufSize); + qry.setTimeInterval(timeInterval); + qry.setAutoUnsubscribe(autoUnsubscribe); + qry.setInitialQuery(initialQry); + + cursor = cache.query(qry.setLocal(loc)); + + if (initialQry != null) + initialQryCur = new PlatformQueryCursor(platformCtx, new QueryCursorEx() { + @Override public Iterator iterator() { + return cursor.iterator(); + } + + @Override public List getAll() { + return cursor.getAll(); + } + + @Override public void close() { + // No-op: do not close whole continuous query when initial query cursor closes. + } + + @Override public void getAll(Consumer clo) throws IgniteCheckedException { + for (Cache.Entry t : this) + clo.consume(t); + } + + @Override public List fieldsMeta() { + return null; + } + }, initialQry.getPageSize() > 0 ? initialQry.getPageSize() : Query.DFLT_PAGE_SIZE); + } + catch (Exception e) { + try + { + close0(); + } + catch (Exception ignored) + { + // Ignore + } + + throw PlatformUtils.unwrapQueryException(e); + } + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void onUpdated(Iterable evts) throws CacheEntryListenerException { + lock.readLock().lock(); + + try { + if (ptr == 0) + throw new CacheEntryListenerException("Failed to notify listener because it has been closed."); + + PlatformUtils.applyContinuousQueryEvents(platformCtx, ptr, evts); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException { + lock.readLock().lock(); + + try { + if (ptr == 0) + throw new CacheEntryListenerException("Failed to evaluate the filter because it has been closed."); + + return !hasFilter || PlatformUtils.evaluateContinuousQueryEvent(platformCtx, ptr, evt); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void onQueryUnregister() { + close(); + } + + /** {@inheritDoc} */ + @Override public void close() { + lock.writeLock().lock(); + + try { + close0(); + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"UnusedDeclaration", "unchecked"}) + @Override public PlatformTarget getInitialQueryCursor() { + return initialQryCur; + } + + /** + * Internal close routine. + */ + private void close0() { + if (ptr != 0) { + long ptr0 = ptr; + + ptr = 0; + + if (cursor != null) + cursor.close(); + + platformCtx.gateway().continuousQueryFilterRelease(ptr0); + } + } + + /** + * Replacer for remote filter. + * + * @return Filter to be deployed on remote node. + * @throws ObjectStreamException If failed. + */ + Object writeReplace() throws ObjectStreamException { + return filter == null ? null : platformCtx.createContinuousQueryFilter(filter); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f4c7107c/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java new file mode 100644 index 0000000..0f19218 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.query; + +import org.apache.ignite.*; +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.cache.query.continuous.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.memory.*; +import org.apache.ignite.internal.processors.platform.utils.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.resources.*; + +import javax.cache.event.*; +import java.io.*; +import java.util.concurrent.locks.*; + +/** + * Continuous query filter deployed on remote nodes. + */ +public class PlatformContinuousQueryRemoteFilter implements CacheContinuousQueryFilterEx, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Lock for concurrency control. */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** Native filter in serialized form. */ + private Object filter; + + /** Grid hosting the filter. */ + @IgniteInstanceResource + private transient Ignite grid; + + /** Native platform pointer. */ + private transient volatile long ptr; + + /** Close flag. Once set, none requests to native platform is possible. */ + private transient boolean closed; + + /** + * {@link java.io.Externalizable} support. + */ + public PlatformContinuousQueryRemoteFilter() { + // No-op. + } + + /** + * Constructor. + * + * @param filter Serialized native filter. + */ + public PlatformContinuousQueryRemoteFilter(Object filter) { + assert filter != null; + + this.filter = filter; + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException { + long ptr0 = ptr; + + if (ptr0 == 0) + deploy(); + + lock.readLock().lock(); + + try { + if (closed) + throw new CacheEntryListenerException("Failed to evaluate the filter because it has been closed."); + + PlatformContext platformCtx = PlatformUtils.platformContext(grid); + + return PlatformUtils.evaluateContinuousQueryEvent(platformCtx, ptr, evt); + } + finally { + lock.readLock().unlock(); + } + } + + /** + * Deploy filter to native platform. + */ + private void deploy() { + lock.writeLock().lock(); + + try { + // 1. Do not deploy if the filter has been closed concurrently. + if (closed) + throw new CacheEntryListenerException("Failed to deploy the filter because it has been closed."); + + // 2. Deploy. + PlatformContext ctx = PlatformUtils.platformContext(grid); + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writer.writeObject(filter); + + out.synchronize(); + + ptr = ctx.gateway().continuousQueryFilterCreate(mem.pointer()); + } + catch (Exception e) { + // 3. Close in case of failure. + close(); + + throw new CacheEntryListenerException("Failed to deploy the filter.", e); + } + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void onQueryUnregister() { + lock.writeLock().lock(); + + try { + close(); + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * Close the filter. + */ + private void close() { + if (!closed) { + try { + if (ptr != 0) { + try { + PlatformUtils.platformContext(grid).gateway().continuousQueryFilterRelease(ptr); + } + finally { + // Nullify the pointer in any case. + ptr = 0; + } + } + } + finally { + closed = true; + } + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(filter); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + filter = in.readObject(); + + assert filter != null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PlatformContinuousQueryRemoteFilter.class, this); + } +}