ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [13/14] ignite git commit: IGNITE-1513: Merged Java to core module.
Date Fri, 18 Sep 2015 10:04:17 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
new file mode 100644
index 0000000..ecdfc2c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -0,0 +1,1090 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cache.CachePartialUpdateException;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.Query;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.TextQuery;
+import org.apache.ignite.internal.portable.PortableRawReaderEx;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.cache.CacheOperationContext;
+import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery;
+import org.apache.ignite.internal.processors.platform.cache.query.PlatformFieldsQueryCursor;
+import org.apache.ignite.internal.processors.platform.cache.query.PlatformQueryCursor;
+import org.apache.ignite.internal.processors.platform.PlatformNativeException;
+import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.internal.util.GridConcurrentFactory;
+import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.lang.IgniteFuture;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.Cache;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Native cache wrapper implementation.
+ */
+@SuppressWarnings({"unchecked", "UnusedDeclaration", "TryFinallyCanBeTryWithResources"})
+public class PlatformCache extends PlatformAbstractTarget {
+    /** */
+    public static final int OP_CLEAR = 1;
+
+    /** */
+    public static final int OP_CLEAR_ALL = 2;
+
+    /** */
+    public static final int OP_CONTAINS_KEY = 3;
+
+    /** */
+    public static final int OP_CONTAINS_KEYS = 4;
+
+    /** */
+    public static final int OP_GET = 5;
+
+    /** */
+    public static final int OP_GET_ALL = 6;
+
+    /** */
+    public static final int OP_GET_AND_PUT = 7;
+
+    /** */
+    public static final int OP_GET_AND_PUT_IF_ABSENT = 8;
+
+    /** */
+    public static final int OP_GET_AND_REMOVE = 9;
+
+    /** */
+    public static final int OP_GET_AND_REPLACE = 10;
+
+    /** */
+    public static final int OP_GET_NAME = 11;
+
+    /** */
+    public static final int OP_INVOKE = 12;
+
+    /** */
+    public static final int OP_INVOKE_ALL = 13;
+
+    /** */
+    public static final int OP_IS_LOCAL_LOCKED = 14;
+
+    /** */
+    public static final int OP_LOAD_CACHE = 15;
+
+    /** */
+    public static final int OP_LOC_EVICT = 16;
+
+    /** */
+    public static final int OP_LOC_LOAD_CACHE = 17;
+
+    /** */
+    public static final int OP_LOC_PROMOTE = 18;
+
+    /** */
+    public static final int OP_LOCAL_CLEAR = 20;
+
+    /** */
+    public static final int OP_LOCAL_CLEAR_ALL = 21;
+
+    /** */
+    public static final int OP_LOCK = 22;
+
+    /** */
+    public static final int OP_LOCK_ALL = 23;
+
+    /** */
+    public static final int OP_METRICS = 24;
+
+    /** */
+    private static final int OP_PEEK = 25;
+
+    /** */
+    private static final int OP_PUT = 26;
+
+    /** */
+    private static final int OP_PUT_ALL = 27;
+
+    /** */
+    public static final int OP_PUT_IF_ABSENT = 28;
+
+    /** */
+    public static final int OP_QRY_CONTINUOUS = 29;
+
+    /** */
+    public static final int OP_QRY_SCAN = 30;
+
+    /** */
+    public static final int OP_QRY_SQL = 31;
+
+    /** */
+    public static final int OP_QRY_SQL_FIELDS = 32;
+
+    /** */
+    public static final int OP_QRY_TXT = 33;
+
+    /** */
+    public static final int OP_REMOVE_ALL = 34;
+
+    /** */
+    public static final int OP_REMOVE_BOOL = 35;
+
+    /** */
+    public static final int OP_REMOVE_OBJ = 36;
+
+    /** */
+    public static final int OP_REPLACE_2 = 37;
+
+    /** */
+    public static final int OP_REPLACE_3 = 38;
+
+    /** Underlying JCache. */
+    private final IgniteCacheProxy cache;
+
+    /** Whether this cache is created with "keepPortable" flag on the other side. */
+    private final boolean keepPortable;
+
+    /** */
+    private static final GetAllWriter WRITER_GET_ALL = new GetAllWriter();
+
+    /** */
+    private static final EntryProcessorInvokeWriter WRITER_INVOKE = new EntryProcessorInvokeWriter();
+
+    /** */
+    private static final EntryProcessorInvokeAllWriter WRITER_INVOKE_ALL = new EntryProcessorInvokeAllWriter();
+
+    /** Map with currently active locks. */
+    private final ConcurrentMap<Long, Lock> lockMap = GridConcurrentFactory.newMap();
+
+    /** Lock ID sequence. */
+    private static final AtomicLong LOCK_ID_GEN = new AtomicLong();
+
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Context.
+     * @param cache Underlying cache.
+     * @param keepPortable Keep portable flag.
+     */
+    public PlatformCache(PlatformContext platformCtx, IgniteCache cache, boolean keepPortable) {
+        super(platformCtx);
+
+        this.cache = (IgniteCacheProxy)cache;
+        this.keepPortable = keepPortable;
+    }
+
+    /**
+     * Gets cache with "skip-store" flag set.
+     *
+     * @return Cache with "skip-store" flag set.
+     */
+    public PlatformCache withSkipStore() {
+        if (cache.delegate().skipStore())
+            return this;
+
+        return new PlatformCache(platformCtx, cache.withSkipStore(), keepPortable);
+    }
+
+    /**
+     * Gets cache with "keep portable" flag.
+     *
+     * @return Cache with "keep portable" flag set.
+     */
+    public PlatformCache withKeepPortable() {
+        if (keepPortable)
+            return this;
+
+        return new PlatformCache(platformCtx, cache.withSkipStore(), true);
+    }
+
+    /**
+     * Gets cache with provided expiry policy.
+     *
+     * @param create Create.
+     * @param update Update.
+     * @param access Access.
+     * @return Cache.
+     */
+    public PlatformCache withExpiryPolicy(final long create, final long update, final long access) {
+        IgniteCache cache0 = cache.withExpiryPolicy(new InteropExpiryPolicy(create, update, access));
+
+        return new PlatformCache(platformCtx, cache0, keepPortable);
+    }
+
+    /**
+     * Gets cache with asynchronous mode enabled.
+     *
+     * @return Cache with asynchronous mode enabled.
+     */
+    public PlatformCache withAsync() {
+        if (cache.isAsync())
+            return this;
+
+        return new PlatformCache(platformCtx, (IgniteCache)cache.withAsync(), keepPortable);
+    }
+
+    /**
+     * Gets cache with no-retries mode enabled.
+     *
+     * @return Cache with no-retries mode enabled.
+     */
+    public PlatformCache withNoRetries() {
+        CacheOperationContext opCtx = cache.operationContext();
+
+        if (opCtx != null && opCtx.noRetries())
+            return this;
+
+        return new PlatformCache(platformCtx, cache.withNoRetries(), keepPortable);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+        switch (type) {
+            case OP_PUT:
+                cache.put(reader.readObjectDetached(), reader.readObjectDetached());
+
+                return TRUE;
+
+            case OP_REMOVE_BOOL:
+                return cache.remove(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
+
+            case OP_REMOVE_ALL:
+                cache.removeAll(PlatformUtils.readSet(reader));
+
+                return TRUE;
+
+            case OP_PUT_ALL:
+                cache.putAll(PlatformUtils.readMap(reader));
+
+                return TRUE;
+
+            case OP_LOC_EVICT:
+                cache.localEvict(PlatformUtils.readCollection(reader));
+
+                return TRUE;
+
+            case OP_CONTAINS_KEY:
+                return cache.containsKey(reader.readObjectDetached()) ? TRUE : FALSE;
+
+            case OP_CONTAINS_KEYS:
+                return cache.containsKeys(PlatformUtils.readSet(reader)) ? TRUE : FALSE;
+
+            case OP_LOC_PROMOTE: {
+                cache.localPromote(PlatformUtils.readSet(reader));
+
+                break;
+            }
+
+            case OP_REPLACE_3:
+                return cache.replace(reader.readObjectDetached(), reader.readObjectDetached(),
+                    reader.readObjectDetached()) ? TRUE : FALSE;
+
+            case OP_LOC_LOAD_CACHE:
+                loadCache0(reader, true);
+
+                break;
+
+            case OP_LOAD_CACHE:
+                loadCache0(reader, false);
+
+                break;
+
+            case OP_CLEAR:
+                cache.clear(reader.readObjectDetached());
+
+                break;
+
+            case OP_CLEAR_ALL:
+                cache.clearAll(PlatformUtils.readSet(reader));
+
+                break;
+
+            case OP_LOCAL_CLEAR:
+                cache.localClear(reader.readObjectDetached());
+
+                break;
+
+            case OP_LOCAL_CLEAR_ALL:
+                cache.localClearAll(PlatformUtils.readSet(reader));
+
+                break;
+
+            case OP_PUT_IF_ABSENT: {
+                return cache.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
+            }
+
+            case OP_REPLACE_2: {
+                return cache.replace(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
+            }
+
+            case OP_REMOVE_OBJ: {
+                return cache.remove(reader.readObjectDetached()) ? TRUE : FALSE;
+            }
+
+            case OP_IS_LOCAL_LOCKED:
+                return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE;
+
+            default:
+                return super.processInStreamOutLong(type, reader);
+        }
+
+        return TRUE;
+    }
+
+    /**
+     * Loads cache via localLoadCache or loadCache.
+     */
+    private void loadCache0(PortableRawReaderEx reader, boolean loc) throws IgniteCheckedException {
+        PlatformCacheEntryFilter filter = null;
+
+        Object pred = reader.readObjectDetached();
+
+        if (pred != null)
+            filter = platformCtx.createCacheEntryFilter(pred, reader.readLong());
+
+        Object[] args = reader.readObjectArray();
+
+        if (loc)
+            cache.localLoadCache(filter, args);
+        else
+            cache.loadCache(filter, args);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Object processInStreamOutObject(int type, PortableRawReaderEx reader)
+        throws IgniteCheckedException {
+        switch (type) {
+            case OP_QRY_SQL:
+                return runQuery(reader, readSqlQuery(reader));
+
+            case OP_QRY_SQL_FIELDS:
+                return runFieldsQuery(reader, readFieldsQuery(reader));
+
+            case OP_QRY_TXT:
+                return runQuery(reader, readTextQuery(reader));
+
+            case OP_QRY_SCAN:
+                return runQuery(reader, readScanQuery(reader));
+
+            case OP_QRY_CONTINUOUS: {
+                long ptr = reader.readLong();
+                boolean loc = reader.readBoolean();
+                boolean hasFilter = reader.readBoolean();
+                Object filter = reader.readObjectDetached();
+                int bufSize = reader.readInt();
+                long timeInterval = reader.readLong();
+                boolean autoUnsubscribe = reader.readBoolean();
+                Query initQry = readInitialQuery(reader);
+
+                PlatformContinuousQuery qry = platformCtx.createContinuousQuery(ptr, hasFilter, filter);
+
+                qry.start(cache, loc, bufSize, timeInterval, autoUnsubscribe, initQry);
+
+                return qry;
+            }
+
+            default:
+                return super.processInStreamOutObject(type, reader);
+        }
+    }
+
+    /**
+     * Read arguments for SQL query.
+     *
+     * @param reader Reader.
+     * @return Arguments.
+     */
+    @Nullable private Object[] readQueryArgs(PortableRawReaderEx reader) {
+        int cnt = reader.readInt();
+
+        if (cnt > 0) {
+            Object[] args = new Object[cnt];
+
+            for (int i = 0; i < cnt; i++)
+                args[i] = reader.readObjectDetached();
+
+            return args;
+        }
+        else
+            return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void processOutStream(int type, PortableRawWriterEx writer) throws IgniteCheckedException {
+        switch (type) {
+            case OP_GET_NAME:
+                writer.writeObject(cache.getName());
+
+                break;
+
+            case OP_METRICS:
+                CacheMetrics metrics = cache.metrics();
+
+                writer.writeLong(metrics.getCacheGets());
+                writer.writeLong(metrics.getCachePuts());
+                writer.writeLong(metrics.getCacheHits());
+                writer.writeLong(metrics.getCacheMisses());
+                writer.writeLong(metrics.getCacheTxCommits());
+                writer.writeLong(metrics.getCacheTxRollbacks());
+                writer.writeLong(metrics.getCacheEvictions());
+                writer.writeLong(metrics.getCacheRemovals());
+                writer.writeFloat(metrics.getAveragePutTime());
+                writer.writeFloat(metrics.getAverageGetTime());
+                writer.writeFloat(metrics.getAverageRemoveTime());
+                writer.writeFloat(metrics.getAverageTxCommitTime());
+                writer.writeFloat(metrics.getAverageTxRollbackTime());
+                writer.writeString(metrics.name());
+                writer.writeLong(metrics.getOverflowSize());
+                writer.writeLong(metrics.getOffHeapEntriesCount());
+                writer.writeLong(metrics.getOffHeapAllocatedSize());
+                writer.writeInt(metrics.getSize());
+                writer.writeInt(metrics.getKeySize());
+                writer.writeBoolean(metrics.isEmpty());
+                writer.writeInt(metrics.getDhtEvictQueueCurrentSize());
+                writer.writeInt(metrics.getTxThreadMapSize());
+                writer.writeInt(metrics.getTxXidMapSize());
+                writer.writeInt(metrics.getTxCommitQueueSize());
+                writer.writeInt(metrics.getTxPrepareQueueSize());
+                writer.writeInt(metrics.getTxStartVersionCountsSize());
+                writer.writeInt(metrics.getTxCommittedVersionsSize());
+                writer.writeInt(metrics.getTxRolledbackVersionsSize());
+                writer.writeInt(metrics.getTxDhtThreadMapSize());
+                writer.writeInt(metrics.getTxDhtXidMapSize());
+                writer.writeInt(metrics.getTxDhtCommitQueueSize());
+                writer.writeInt(metrics.getTxDhtPrepareQueueSize());
+                writer.writeInt(metrics.getTxDhtStartVersionCountsSize());
+                writer.writeInt(metrics.getTxDhtCommittedVersionsSize());
+                writer.writeInt(metrics.getTxDhtRolledbackVersionsSize());
+                writer.writeBoolean(metrics.isWriteBehindEnabled());
+                writer.writeInt(metrics.getWriteBehindFlushSize());
+                writer.writeInt(metrics.getWriteBehindFlushThreadCount());
+                writer.writeLong(metrics.getWriteBehindFlushFrequency());
+                writer.writeInt(metrics.getWriteBehindStoreBatchSize());
+                writer.writeInt(metrics.getWriteBehindTotalCriticalOverflowCount());
+                writer.writeInt(metrics.getWriteBehindCriticalOverflowCount());
+                writer.writeInt(metrics.getWriteBehindErrorRetryCount());
+                writer.writeInt(metrics.getWriteBehindBufferSize());
+                writer.writeString(metrics.getKeyType());
+                writer.writeString(metrics.getValueType());
+                writer.writeBoolean(metrics.isStoreByValue());
+                writer.writeBoolean(metrics.isStatisticsEnabled());
+                writer.writeBoolean(metrics.isManagementEnabled());
+                writer.writeBoolean(metrics.isReadThrough());
+                writer.writeBoolean(metrics.isWriteThrough());
+                writer.writeFloat(metrics.getCacheHitPercentage());
+                writer.writeFloat(metrics.getCacheMissPercentage());
+
+                break;
+
+            default:
+                super.processOutStream(type, writer);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"})
+    @Override protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer)
+        throws IgniteCheckedException {
+        switch (type) {
+            case OP_GET: {
+                writer.writeObjectDetached(cache.get(reader.readObjectDetached()));
+
+                break;
+            }
+
+            case OP_GET_AND_PUT: {
+                writer.writeObjectDetached(cache.getAndPut(reader.readObjectDetached(), reader.readObjectDetached()));
+
+                break;
+            }
+
+            case OP_GET_AND_REPLACE: {
+                writer.writeObjectDetached(cache.getAndReplace(reader.readObjectDetached(),
+                    reader.readObjectDetached()));
+
+                break;
+            }
+
+            case OP_GET_AND_REMOVE: {
+                writer.writeObjectDetached(cache.getAndRemove(reader.readObjectDetached()));
+
+                break;
+            }
+
+            case OP_GET_AND_PUT_IF_ABSENT: {
+                writer.writeObjectDetached(cache.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()));
+
+                break;
+            }
+
+            case OP_PEEK: {
+                Object key = reader.readObjectDetached();
+
+                CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt());
+
+                writer.writeObjectDetached(cache.localPeek(key, modes));
+
+                break;
+            }
+
+            case OP_GET_ALL: {
+                Set keys = PlatformUtils.readSet(reader);
+
+                Map entries = cache.getAll(keys);
+
+                PlatformUtils.writeNullableMap(writer, entries);
+
+                break;
+            }
+
+            case OP_INVOKE: {
+                Object key = reader.readObjectDetached();
+
+                CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
+
+                try {
+                    writer.writeObjectDetached(cache.invoke(key, proc));
+                }
+                catch (EntryProcessorException ex)
+                {
+                    if (ex.getCause() instanceof PlatformNativeException)
+                        writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause());
+                    else
+                        throw ex;
+                }
+
+                break;
+            }
+
+            case OP_INVOKE_ALL: {
+                Set<Object> keys = PlatformUtils.readSet(reader);
+
+                CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
+
+                writeInvokeAllResult(writer, cache.invokeAll(keys, proc));
+
+                break;
+            }
+
+            case OP_LOCK:
+                writer.writeLong(registerLock(cache.lock(reader.readObjectDetached())));
+
+                break;
+
+            case OP_LOCK_ALL:
+                writer.writeLong(registerLock(cache.lockAll(PlatformUtils.readCollection(reader))));
+
+                break;
+
+            default:
+                super.processInStreamOutStream(type, reader, writer);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Exception convertException(Exception e) {
+        if (e instanceof CachePartialUpdateException)
+            return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e.getCause(),
+                platformCtx, keepPortable);
+
+        if (e instanceof CachePartialUpdateCheckedException)
+            return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e, platformCtx, keepPortable);
+
+        if (e.getCause() instanceof EntryProcessorException)
+            return (EntryProcessorException) e.getCause();
+
+        return super.convertException(e);
+    }
+
+    /**
+     * Writes the result of InvokeAll cache method.
+     *
+     * @param writer Writer.
+     * @param results Results.
+     */
+    private static void writeInvokeAllResult(PortableRawWriterEx writer, Map<Object, EntryProcessorResult> results) {
+        if (results == null) {
+            writer.writeInt(-1);
+
+            return;
+        }
+
+        writer.writeInt(results.size());
+
+        for (Map.Entry<Object, EntryProcessorResult> entry : results.entrySet()) {
+            writer.writeObjectDetached(entry.getKey());
+
+            EntryProcessorResult procRes = entry.getValue();
+
+            try {
+                Object res = procRes.get();
+
+                writer.writeBoolean(false);  // No exception
+
+                writer.writeObjectDetached(res);
+            }
+            catch (Exception ex) {
+                writer.writeBoolean(true);  // Exception
+
+                writeError(writer, ex);
+            }
+        }
+    }
+
+    /**
+     * Writes an error to the writer either as a native exception, or as a couple of strings.
+     * @param writer Writer.
+     * @param ex Exception.
+     */
+    private static void writeError(PortableRawWriterEx writer, Exception ex) {
+        if (ex.getCause() instanceof PlatformNativeException)
+            writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause());
+        else {
+            writer.writeObjectDetached(ex.getClass().getName());
+            writer.writeObjectDetached(ex.getMessage());
+        }
+    }
+
+    /** <inheritDoc /> */
+    @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
+        return cache.future();
+    }
+
+    /** <inheritDoc /> */
+    @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) {
+        if (opId == OP_GET_ALL)
+            return WRITER_GET_ALL;
+
+        if (opId == OP_INVOKE)
+            return WRITER_INVOKE;
+
+        if (opId == OP_INVOKE_ALL)
+            return WRITER_INVOKE_ALL;
+
+        return null;
+    }
+
+    /**
+     * Clears the contents of the cache, without notifying listeners or
+     * {@ignitelink javax.cache.integration.CacheWriter}s.
+     *
+     * @throws IllegalStateException if the cache is closed.
+     * @throws javax.cache.CacheException if there is a problem during the clear
+     */
+    public void clear() throws IgniteCheckedException {
+        cache.clear();
+    }
+
+    /**
+     * Removes all entries.
+     *
+     * @throws org.apache.ignite.IgniteCheckedException In case of error.
+     */
+    public void removeAll() throws IgniteCheckedException {
+        cache.removeAll();
+    }
+
+    /**
+     * Read cache size.
+     *
+     * @param peekModes Encoded peek modes.
+     * @param loc Local mode flag.
+     * @return Size.
+     */
+    public int size(int peekModes, boolean loc) {
+        CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(peekModes);
+
+        return loc ? cache.localSize(modes) :  cache.size(modes);
+    }
+
+    /**
+     * Create cache iterator.
+     *
+     * @return Cache iterator.
+     */
+    public PlatformCacheIterator iterator() {
+        Iterator<Cache.Entry> iter = cache.iterator();
+
+        return new PlatformCacheIterator(platformCtx, iter);
+    }
+
+    /**
+     * Create cache iterator over local entries.
+     *
+     * @param peekModes Peke modes.
+     * @return Cache iterator.
+     */
+    public PlatformCacheIterator localIterator(int peekModes) {
+        CachePeekMode[] peekModes0 = PlatformUtils.decodeCachePeekModes(peekModes);
+
+        Iterator<Cache.Entry> iter = cache.localEntries(peekModes0).iterator();
+
+        return new PlatformCacheIterator(platformCtx, iter);
+    }
+
+    /**
+     * Enters a lock.
+     *
+     * @param id Lock id.
+     */
+    public void enterLock(long id) throws InterruptedException {
+        lock(id).lockInterruptibly();
+    }
+
+    /**
+     * Exits a lock.
+     *
+     * @param id Lock id.
+     */
+    public void exitLock(long id) {
+        lock(id).unlock();
+    }
+
+    /**
+     * Attempts to enter a lock.
+     *
+     * @param id Lock id.
+     * @param timeout Timeout, in milliseconds. -1 for infinite timeout.
+     */
+    public boolean tryEnterLock(long id, long timeout) throws InterruptedException {
+        return timeout == -1
+            ? lock(id).tryLock()
+            : lock(id).tryLock(timeout, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Rebalances the cache.
+     *
+     * @param futId Future id.
+     */
+    public void rebalance(long futId) {
+        PlatformFutureUtils.listen(platformCtx, cache.rebalance().chain(new C1<IgniteFuture, Object>() {
+            @Override public Object apply(IgniteFuture fut) {
+                return null;
+            }
+        }), futId, PlatformFutureUtils.TYP_OBJ, this);
+    }
+
+    /**
+     * Unregister lock.
+     *
+     * @param id Lock id.
+     */
+    public void closeLock(long id){
+        Lock lock = lockMap.remove(id);
+
+        assert lock != null : "Failed to unregister lock: " + id;
+    }
+
+    /**
+     * Get lock by id.
+     *
+     * @param id Id.
+     * @return Lock.
+     */
+    private Lock lock(long id) {
+        Lock lock = lockMap.get(id);
+
+        assert lock != null : "Lock not found for ID: " + id;
+
+        return lock;
+    }
+
+    /**
+     * Registers a lock in a map.
+     *
+     * @param lock Lock to register.
+     * @return Registered lock id.
+     */
+    private long registerLock(Lock lock) {
+        long id = LOCK_ID_GEN.incrementAndGet();
+
+        lockMap.put(id, lock);
+
+        return id;
+    }
+
+    /**
+     * Runs specified query.
+     */
+    private PlatformQueryCursor runQuery(PortableRawReaderEx reader, Query qry) throws IgniteCheckedException {
+
+        try {
+            QueryCursorEx cursor = (QueryCursorEx) cache.query(qry);
+
+            return new PlatformQueryCursor(platformCtx, cursor,
+                qry.getPageSize() > 0 ? qry.getPageSize(): Query.DFLT_PAGE_SIZE);
+        }
+        catch (Exception err) {
+            throw PlatformUtils.unwrapQueryException(err);
+        }
+    }
+
+    /**
+     * Runs specified fields query.
+     */
+    private PlatformFieldsQueryCursor runFieldsQuery(PortableRawReaderEx reader, Query qry)
+        throws IgniteCheckedException {
+        try {
+            QueryCursorEx cursor = (QueryCursorEx) cache.query(qry);
+
+            return new PlatformFieldsQueryCursor(platformCtx, cursor,
+                qry.getPageSize() > 0 ? qry.getPageSize() : Query.DFLT_PAGE_SIZE);
+        }
+        catch (Exception err) {
+            throw PlatformUtils.unwrapQueryException(err);
+        }
+    }
+
+    /**
+     * Reads the query of specified type.
+     */
+    private Query readInitialQuery(PortableRawReaderEx reader) throws IgniteCheckedException {
+        int typ = reader.readInt();
+
+        switch (typ) {
+            case -1:
+                return null;
+
+            case OP_QRY_SCAN:
+                return readScanQuery(reader);
+
+            case OP_QRY_SQL:
+                return readSqlQuery(reader);
+
+            case OP_QRY_TXT:
+                return readTextQuery(reader);
+        }
+
+        throw new IgniteCheckedException("Unsupported query type: " + typ);
+    }
+
+    /**
+     * Reads sql query.
+     */
+    private Query readSqlQuery(PortableRawReaderEx reader) {
+        boolean loc = reader.readBoolean();
+        String sql = reader.readString();
+        String typ = reader.readString();
+        final int pageSize = reader.readInt();
+
+        Object[] args = readQueryArgs(reader);
+
+        return new SqlQuery(typ, sql).setPageSize(pageSize).setArgs(args).setLocal(loc);
+    }
+
+    /**
+     * Reads fields query.
+     */
+    private Query readFieldsQuery(PortableRawReaderEx reader) {
+        boolean loc = reader.readBoolean();
+        String sql = reader.readString();
+        final int pageSize = reader.readInt();
+
+        Object[] args = readQueryArgs(reader);
+
+        return new SqlFieldsQuery(sql).setPageSize(pageSize).setArgs(args).setLocal(loc);
+    }
+
+    /**
+     * Reads text query.
+     */
+    private Query readTextQuery(PortableRawReaderEx reader) {
+        boolean loc = reader.readBoolean();
+        String txt = reader.readString();
+        String typ = reader.readString();
+        final int pageSize = reader.readInt();
+
+        return new TextQuery(typ, txt).setPageSize(pageSize).setLocal(loc);
+    }
+
+    /**
+     * Reads scan query.
+     */
+    private Query readScanQuery(PortableRawReaderEx reader) {
+        boolean loc = reader.readBoolean();
+        final int pageSize = reader.readInt();
+
+        boolean hasPart = reader.readBoolean();
+
+        Integer part = hasPart ? reader.readInt() : null;
+
+        ScanQuery qry = new ScanQuery().setPageSize(pageSize);
+
+        qry.setPartition(part);
+
+        Object pred = reader.readObjectDetached();
+
+        if (pred != null)
+            qry.setFilter(platformCtx.createCacheEntryFilter(pred, reader.readLong()));
+
+        qry.setLocal(loc);
+
+        return qry;
+    }
+
+    /**
+     * Writes error with EntryProcessorException cause.
+     */
+    private static class GetAllWriter implements PlatformFutureUtils.Writer {
+        /** <inheritDoc /> */
+        @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
+            assert obj instanceof Map;
+
+            PlatformUtils.writeNullableMap(writer, (Map) obj);
+        }
+
+        /** <inheritDoc /> */
+        @Override public boolean canWrite(Object obj, Throwable err) {
+            return err == null;
+        }
+    }
+
+    /**
+     * Writes error with EntryProcessorException cause.
+     */
+    private static class EntryProcessorInvokeWriter implements PlatformFutureUtils.Writer {
+        /** <inheritDoc /> */
+        @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
+            if (err == null) {
+                writer.writeBoolean(false);  // No error.
+
+                writer.writeObjectDetached(obj);
+            }
+            else {
+                writer.writeBoolean(true);  // Error.
+
+                writeError(writer, (Exception) err);
+            }
+        }
+
+        /** <inheritDoc /> */
+        @Override public boolean canWrite(Object obj, Throwable err) {
+            return true;
+        }
+    }
+
+    /**
+     * Writes results of InvokeAll method.
+     */
+    private static class EntryProcessorInvokeAllWriter implements PlatformFutureUtils.Writer {
+        /** <inheritDoc /> */
+        @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
+            writeInvokeAllResult(writer, (Map)obj);
+        }
+
+        /** <inheritDoc /> */
+        @Override public boolean canWrite(Object obj, Throwable err) {
+            return obj != null && err == null;
+        }
+    }
+
+    /**
+     * Interop expiry policy.
+     */
+    private static class InteropExpiryPolicy implements ExpiryPolicy {
+        /** Duration: unchanged. */
+        private static final long DUR_UNCHANGED = -2;
+
+        /** Duration: eternal. */
+        private static final long DUR_ETERNAL = -1;
+
+        /** Duration: zero. */
+        private static final long DUR_ZERO = 0;
+
+        /** Expiry for create. */
+        private final Duration create;
+
+        /** Expiry for update. */
+        private final Duration update;
+
+        /** Expiry for access. */
+        private final Duration access;
+
+        /**
+         * Constructor.
+         *
+         * @param create Expiry for create.
+         * @param update Expiry for update.
+         * @param access Expiry for access.
+         */
+        public InteropExpiryPolicy(long create, long update, long access) {
+            this.create = convert(create);
+            this.update = convert(update);
+            this.access = convert(access);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Duration getExpiryForCreation() {
+            return create;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Duration getExpiryForUpdate() {
+            return update;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Duration getExpiryForAccess() {
+            return access;
+        }
+
+        /**
+         * Convert encoded duration to actual duration.
+         *
+         * @param dur Encoded duration.
+         * @return Actual duration.
+         */
+        private static Duration convert(long dur) {
+            if (dur == DUR_UNCHANGED)
+                return null;
+            else if (dur == DUR_ETERNAL)
+                return Duration.ETERNAL;
+            else if (dur == DUR_ZERO)
+                return Duration.ZERO;
+            else {
+                assert dur > 0;
+
+                return new Duration(TimeUnit.MILLISECONDS, dur);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
new file mode 100644
index 0000000..5f8ec8f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * Interop filter. Delegates apply to native platform.
+ */
+public class PlatformCacheEntryFilterImpl extends PlatformAbstractPredicate implements PlatformCacheEntryFilter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * {@link java.io.Externalizable} support.
+     */
+    public PlatformCacheEntryFilterImpl() {
+        super();
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param pred .Net portable predicate.
+     * @param ptr Pointer to predicate in the native platform.
+     * @param ctx Kernal context.
+     */
+    public PlatformCacheEntryFilterImpl(Object pred, long ptr, PlatformContext ctx) {
+        super(pred, ptr, ctx);
+
+        assert pred != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(Object k, Object v) {
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObject(k);
+            writer.writeObject(v);
+
+            out.synchronize();
+
+            return ctx.gateway().cacheEntryFilterApply(ptr, mem.pointer()) != 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onClose() {
+        if (ptr == 0)
+            return;
+
+        assert ctx != null;
+
+        ctx.gateway().cacheEntryFilterDestroy(ptr);
+
+        ptr = 0;
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     */
+    @IgniteInstanceResource
+    public void setIgniteInstance(Ignite ignite) {
+        ctx = PlatformUtils.platformContext(ignite);
+
+        if (ptr != 0)
+            return;
+
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObject(pred);
+
+            out.synchronize();
+
+            ptr = ctx.gateway().cacheEntryFilterCreate(mem.pointer());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
new file mode 100644
index 0000000..f59a63f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.portable.PortableRawReaderEx;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformProcessor;
+import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Platform cache entry processor. Delegates processing to native platform.
+ */
+public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProcessor, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Indicates that entry has not been modified  */
+    private static final byte ENTRY_STATE_INTACT = 0;
+
+    /** Indicates that entry value has been set  */
+    private static final byte ENTRY_STATE_VALUE_SET = 1;
+
+    /** Indicates that remove has been called on an entry  */
+    private static final byte ENTRY_STATE_REMOVED = 2;
+
+    /** Indicates error in processor that is written as portable.  */
+    private static final byte ENTRY_STATE_ERR_PORTABLE = 3;
+
+    /** Indicates error in processor that is written as string.  */
+    private static final byte ENTRY_STATE_ERR_STRING = 4;
+
+    /** Native portable processor */
+    private Object proc;
+
+    /** Pointer to processor in the native platform. */
+    private transient long ptr;
+
+    /**
+     * {@link java.io.Externalizable} support.
+     */
+    public PlatformCacheEntryProcessorImpl() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param proc Native portable processor
+     * @param ptr Pointer to processor in the native platform.
+     */
+    public PlatformCacheEntryProcessorImpl(Object proc, long ptr) {
+        this.proc = proc;
+        this.ptr = ptr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object process(MutableEntry entry, Object... args)
+        throws EntryProcessorException {
+        try {
+            IgniteKernal ignite = (IgniteKernal)entry.unwrap(Ignite.class);
+
+            PlatformProcessor interopProc;
+
+            try {
+                interopProc = PlatformUtils.platformProcessor(ignite);
+            }
+            catch (IllegalStateException ex){
+                throw new EntryProcessorException(ex);
+            }
+
+            interopProc.awaitStart();
+
+            return execute0(interopProc.context(), entry);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /**
+     * Executes interop entry processor on a given entry, updates entry and returns result.
+     *
+     * @param ctx Context.
+     * @param entry Entry.
+     * @return Processing result.
+     * @throws org.apache.ignite.IgniteCheckedException
+     */
+    private Object execute0(PlatformContext ctx, MutableEntry entry)
+        throws IgniteCheckedException {
+        try (PlatformMemory outMem = ctx.memory().allocate()) {
+            PlatformOutputStream out = outMem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writeEntryAndProcessor(entry, writer);
+
+            out.synchronize();
+
+            try (PlatformMemory inMem = ctx.memory().allocate()) {
+                PlatformInputStream in = inMem.input();
+
+                ctx.gateway().cacheInvoke(outMem.pointer(), inMem.pointer());
+
+                in.synchronize();
+
+                PortableRawReaderEx reader = ctx.reader(in);
+
+                return readResultAndUpdateEntry(ctx, entry, reader);
+            }
+        }
+    }
+
+    /**
+     * Writes mutable entry and entry processor to the stream.
+     *
+     * @param entry Entry to process.
+     * @param writer Writer.
+     */
+    private void writeEntryAndProcessor(MutableEntry entry, PortableRawWriterEx writer) {
+        writer.writeObject(entry.getKey());
+        writer.writeObject(entry.getValue());
+
+        if (ptr != 0) {
+            // Execute locally - we have a pointer to native processor.
+            writer.writeBoolean(true);
+            writer.writeLong(ptr);
+        }
+        else {
+            // We are on a remote node. Send processor holder back to native.
+            writer.writeBoolean(false);
+            writer.writeObject(proc);
+        }
+    }
+
+    /**
+     * Reads processing result from stream, updates mutable entry accordingly, and returns the result.
+     *
+     * @param entry Mutable entry to update.
+     * @param reader Reader.
+     * @return Entry processing result
+     * @throws javax.cache.processor.EntryProcessorException If processing has failed in user code.
+     */
+    @SuppressWarnings("unchecked")
+    private Object readResultAndUpdateEntry(PlatformContext ctx, MutableEntry entry, PortableRawReaderEx reader) {
+        byte state = reader.readByte();
+
+        switch (state) {
+            case ENTRY_STATE_VALUE_SET:
+                entry.setValue(reader.readObjectDetached());
+
+                break;
+
+            case ENTRY_STATE_REMOVED:
+                entry.remove();
+
+                break;
+
+            case ENTRY_STATE_ERR_PORTABLE:
+                // Full exception
+                Object nativeErr = reader.readObjectDetached();
+
+                assert nativeErr != null;
+
+                throw new EntryProcessorException("Failed to execute native cache entry processor.",
+                    ctx.createNativeException(nativeErr));
+
+            case ENTRY_STATE_ERR_STRING:
+                // Native exception was not serializable, we have only message.
+                String errMsg = reader.readString();
+
+                assert errMsg != null;
+
+                throw new EntryProcessorException("Failed to execute native cache entry processor: " + errMsg);
+
+            default:
+                assert state == ENTRY_STATE_INTACT;
+        }
+
+        return reader.readObjectDetached();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(proc);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        proc = in.readObject();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java
new file mode 100644
index 0000000..78ca683
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import java.util.Iterator;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+
+/**
+ * Interop cache iterator.
+ */
+public class PlatformCacheIterator extends PlatformAbstractTarget {
+    /** Operation: next entry. */
+    private static final int OP_NEXT = 1;
+
+    /** Iterator. */
+    private final Iterator<Cache.Entry> iter;
+
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Context.
+     * @param iter Iterator.
+     */
+    public PlatformCacheIterator(PlatformContext platformCtx, Iterator<Cache.Entry> iter) {
+        super(platformCtx);
+
+        this.iter = iter;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void processOutStream(int type, PortableRawWriterEx writer) throws IgniteCheckedException {
+        switch (type) {
+            case OP_NEXT:
+                if (iter.hasNext()) {
+                    Cache.Entry e = iter.next();
+
+                    assert e != null;
+
+                    writer.writeBoolean(true);
+
+                    writer.writeObjectDetached(e.getKey());
+                    writer.writeObjectDetached(e.getValue());
+                }
+                else
+                    writer.writeBoolean(false);
+
+                break;
+
+            default:
+                super.processOutStream(type, writer);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java
new file mode 100644
index 0000000..ef17a06
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformExtendedException;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+
+import java.util.Collection;
+
+/**
+ * Interop cache partial update exception.
+ */
+public class PlatformCachePartialUpdateException extends PlatformExtendedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Keep portable flag. */
+    private final boolean keepPortable;
+
+    /**
+     * Constructor.
+     *
+     * @param cause Root cause.
+     * @param ctx Context.
+     * @param keepPortable Keep portable flag.
+     */
+    public PlatformCachePartialUpdateException(CachePartialUpdateCheckedException cause, PlatformContext ctx,
+        boolean keepPortable) {
+        super(cause, ctx);
+        this.keepPortable = keepPortable;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeData(PortableRawWriterEx writer) {
+        Collection keys = ((CachePartialUpdateCheckedException)getCause()).failedKeys();
+
+        writer.writeBoolean(keepPortable);
+
+        PlatformUtils.writeNullableCollection(writer, keys);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
new file mode 100644
index 0000000..9dd7416
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.affinity;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.portable.PortableRawReaderEx;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Native cache wrapper implementation.
+ */
+@SuppressWarnings({"unchecked", "UnusedDeclaration", "TryFinallyCanBeTryWithResources"})
+public class PlatformAffinity extends PlatformAbstractTarget {
+    /** */
+    public static final int OP_AFFINITY_KEY = 1;
+
+    /** */
+    public static final int OP_ALL_PARTITIONS = 2;
+
+    /** */
+    public static final int OP_BACKUP_PARTITIONS = 3;
+
+    /** */
+    public static final int OP_IS_BACKUP = 4;
+
+    /** */
+    public static final int OP_IS_PRIMARY = 5;
+
+    /** */
+    public static final int OP_IS_PRIMARY_OR_BACKUP = 6;
+
+    /** */
+    public static final int OP_MAP_KEY_TO_NODE = 7;
+
+    /** */
+    public static final int OP_MAP_KEY_TO_PRIMARY_AND_BACKUPS = 8;
+
+    /** */
+    public static final int OP_MAP_KEYS_TO_NODES = 9;
+
+    /** */
+    public static final int OP_MAP_PARTITION_TO_NODE = 10;
+
+    /** */
+    public static final int OP_MAP_PARTITION_TO_PRIMARY_AND_BACKUPS = 11;
+
+    /** */
+    public static final int OP_MAP_PARTITIONS_TO_NODES = 12;
+
+    /** */
+    public static final int OP_PARTITION = 13;
+
+    /** */
+    public static final int OP_PRIMARY_PARTITIONS = 14;
+
+    /** */
+    private static final C1<ClusterNode, UUID> TO_NODE_ID = new C1<ClusterNode, UUID>() {
+        @Nullable @Override public UUID apply(ClusterNode node) {
+            return node != null ? node.id() : null;
+        }
+    };
+
+    /** Underlying cache affinity. */
+    private final Affinity<Object> aff;
+
+    /** Discovery manager */
+    private final GridDiscoveryManager discovery;
+
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Context.
+     * @param igniteCtx Ignite context.
+     * @param name Cache name.
+     */
+    public PlatformAffinity(PlatformContext platformCtx, GridKernalContext igniteCtx, @Nullable String name)
+        throws IgniteCheckedException {
+        super(platformCtx);
+
+        this.aff = igniteCtx.grid().affinity(name);
+
+        if (aff == null)
+            throw new IgniteCheckedException("Cache with the given name doesn't exist: " + name);
+
+        discovery = igniteCtx.discovery();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+        switch (type) {
+            case OP_PARTITION:
+                return aff.partition(reader.readObjectDetached());
+
+            case OP_IS_PRIMARY: {
+                UUID nodeId = reader.readUuid();
+
+                Object key = reader.readObjectDetached();
+
+                ClusterNode node = discovery.node(nodeId);
+
+                if (node == null)
+                    return FALSE;
+
+                return aff.isPrimary(node, key) ? TRUE : FALSE;
+            }
+
+            case OP_IS_BACKUP: {
+                UUID nodeId = reader.readUuid();
+
+                Object key = reader.readObjectDetached();
+
+                ClusterNode node = discovery.node(nodeId);
+
+                if (node == null)
+                    return FALSE;
+
+                return aff.isBackup(node, key) ? TRUE : FALSE;
+            }
+
+            case OP_IS_PRIMARY_OR_BACKUP: {
+                UUID nodeId = reader.readUuid();
+
+                Object key = reader.readObjectDetached();
+
+                ClusterNode node = discovery.node(nodeId);
+
+                if (node == null)
+                    return FALSE;
+
+                return aff.isPrimaryOrBackup(node, key) ? TRUE : FALSE;
+            }
+
+            default:
+                return super.processInStreamOutLong(type, reader);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"})
+    @Override protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer)
+        throws IgniteCheckedException {
+        switch (type) {
+            case OP_PRIMARY_PARTITIONS: {
+                UUID nodeId = reader.readObject();
+
+                ClusterNode node = discovery.node(nodeId);
+
+                int[] parts = node != null ? aff.primaryPartitions(node) : U.EMPTY_INTS;
+
+                writer.writeIntArray(parts);
+
+                break;
+            }
+
+            case OP_BACKUP_PARTITIONS: {
+                UUID nodeId = reader.readObject();
+
+                ClusterNode node = discovery.node(nodeId);
+
+                int[] parts = node != null ? aff.backupPartitions(node) : U.EMPTY_INTS;
+
+                writer.writeIntArray(parts);
+
+                break;
+            }
+
+            case OP_ALL_PARTITIONS: {
+                UUID nodeId = reader.readObject();
+
+                ClusterNode node = discovery.node(nodeId);
+
+                int[] parts = node != null ? aff.allPartitions(node) : U.EMPTY_INTS;
+
+                writer.writeIntArray(parts);
+
+                break;
+            }
+
+            case OP_AFFINITY_KEY: {
+                Object key = reader.readObjectDetached();
+
+                writer.writeObject(aff.affinityKey(key));
+
+                break;
+            }
+
+            case OP_MAP_KEY_TO_NODE: {
+                Object key = reader.readObjectDetached();
+
+                ClusterNode node = aff.mapKeyToNode(key);
+
+                platformCtx.writeNode(writer, node);
+
+                break;
+            }
+
+            case OP_MAP_PARTITION_TO_NODE: {
+                int part = reader.readObject();
+
+                ClusterNode node = aff.mapPartitionToNode(part);
+
+                platformCtx.writeNode(writer, node);
+
+                break;
+            }
+
+            case OP_MAP_KEY_TO_PRIMARY_AND_BACKUPS: {
+                Object key = reader.readObjectDetached();
+
+                platformCtx.writeNodes(writer, aff.mapKeyToPrimaryAndBackups(key));
+
+                break;
+            }
+
+            case OP_MAP_PARTITION_TO_PRIMARY_AND_BACKUPS: {
+                int part = reader.readObject();
+
+                platformCtx.writeNodes(writer, aff.mapPartitionToPrimaryAndBackups(part));
+
+                break;
+            }
+
+            case OP_MAP_KEYS_TO_NODES: {
+                Collection<Object> keys = reader.readCollection();
+
+                Map<ClusterNode, Collection<Object>> map = aff.mapKeysToNodes(keys);
+
+                writer.writeInt(map.size());
+
+                for (Map.Entry<ClusterNode, Collection<Object>> e : map.entrySet()) {
+                    platformCtx.addNode(e.getKey());
+
+                    writer.writeUuid(e.getKey().id());
+                    writer.writeObject(e.getValue());
+                }
+
+                break;
+            }
+
+            case OP_MAP_PARTITIONS_TO_NODES: {
+                Collection<Integer> parts = reader.readCollection();
+
+                Map<Integer, ClusterNode> map = aff.mapPartitionsToNodes(parts);
+
+                writer.writeInt(map.size());
+
+                for (Map.Entry<Integer, ClusterNode> e : map.entrySet()) {
+                    platformCtx.addNode(e.getValue());
+
+                    writer.writeInt(e.getKey());
+
+                    writer.writeUuid(e.getValue().id());
+                }
+
+                break;
+            }
+
+            default:
+                super.processInStreamOutStream(type, reader, writer);
+        }
+    }
+
+    /**
+     * @return Gets number of partitions in cache.
+     */
+    public int partitions() {
+        return aff.partitions();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
new file mode 100644
index 0000000..6c2c873
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import java.util.Iterator;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+
+/**
+ *
+ */
+public abstract class PlatformAbstractQueryCursor<T> extends PlatformAbstractTarget implements AutoCloseable {
+    /** Get multiple entries. */
+    private static final int OP_GET_ALL = 1;
+
+    /** Get all entries. */
+    private static final int OP_GET_BATCH = 2;
+
+    /** Get single entry. */
+    private static final int OP_GET_SINGLE = 3;
+
+    /** Underlying cursor. */
+    private final QueryCursorEx<T> cursor;
+
+    /** Batch size size. */
+    private final int batchSize;
+
+    /** Underlying iterator. */
+    private Iterator<T> iter;
+
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Context.
+     * @param cursor Underlying cursor.
+     * @param batchSize Batch size.
+     */
+    public PlatformAbstractQueryCursor(PlatformContext platformCtx, QueryCursorEx<T> cursor, int batchSize) {
+        super(platformCtx);
+
+        this.cursor = cursor;
+        this.batchSize = batchSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void processOutStream(int type, final PortableRawWriterEx writer) throws IgniteCheckedException {
+        switch (type) {
+            case OP_GET_BATCH: {
+                assert iter != null : "iterator() has not been called";
+
+                try {
+                    int cntPos = writer.reserveInt();
+
+                    int cnt;
+
+                    for (cnt = 0; cnt < batchSize; cnt++) {
+                        if (iter.hasNext())
+                            write(writer, iter.next());
+                        else
+                            break;
+                    }
+
+                    writer.writeInt(cntPos, cnt);
+                }
+                catch (Exception err) {
+                    throw PlatformUtils.unwrapQueryException(err);
+                }
+
+                break;
+            }
+
+            case OP_GET_SINGLE: {
+                assert iter != null : "iterator() has not been called";
+
+                try {
+                    if (iter.hasNext()) {
+                        write(writer, iter.next());
+
+                        return;
+                    }
+                }
+                catch (Exception err) {
+                    throw PlatformUtils.unwrapQueryException(err);
+                }
+
+                throw new IgniteCheckedException("No more data available.");
+            }
+
+            case OP_GET_ALL: {
+                try {
+                    int pos = writer.reserveInt();
+
+                    Consumer<T> consumer = new Consumer<>(this, writer);
+
+                    cursor.getAll(consumer);
+
+                    writer.writeInt(pos, consumer.cnt);
+                }
+                catch (Exception err) {
+                    throw PlatformUtils.unwrapQueryException(err);
+                }
+
+                break;
+            }
+
+            default:
+                super.processOutStream(type, writer);
+        }
+    }
+
+    /**
+     * Get cursor iterator.
+     */
+    public void iterator() {
+        iter = cursor.iterator();
+    }
+
+    /**
+     * Check whether next iterator entry exists.
+     *
+     * @return {@code True} if exists.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public boolean iteratorHasNext() {
+        assert iter != null : "iterator() has not been called";
+
+        return iter.hasNext();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        cursor.close();
+    }
+
+    /**
+     * Write value to the stream. Extension point to perform conversions on the object before writing it.
+     *
+     * @param writer Writer.
+     * @param val Value.
+     */
+    protected abstract void write(PortableRawWriterEx writer, T val);
+
+    /**
+     * Query cursor consumer.
+     */
+    private static class Consumer<T> implements QueryCursorEx.Consumer<T> {
+        /** Current query cursor. */
+        private final PlatformAbstractQueryCursor<T> cursor;
+
+        /** Writer. */
+        private final PortableRawWriterEx writer;
+
+        /** Count. */
+        private int cnt;
+
+        /**
+         * Constructor.
+         *
+         * @param writer Writer.
+         */
+        public Consumer(PlatformAbstractQueryCursor<T> cursor, PortableRawWriterEx writer) {
+            this.cursor = cursor;
+            this.writer = writer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void consume(T val) throws IgniteCheckedException {
+            cursor.write(writer, val);
+
+            cnt++;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
new file mode 100644
index 0000000..453e233
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import java.io.ObjectStreamException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.Query;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformTarget;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+
+/**
+ * Interop continuous query handle.
+ */
+public class PlatformContinuousQueryImpl implements PlatformContinuousQuery {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Context. */
+    protected final PlatformContext platformCtx;
+
+    /** Whether filter exists. */
+    private final boolean hasFilter;
+
+    /** Native filter in serialized form. If null, then filter is either not set, or this is local query. */
+    protected final Object filter;
+
+    /** Pointer to native counterpart; zero if closed. */
+    private long ptr;
+
+    /** Cursor to handle filter close. */
+    private QueryCursor cursor;
+
+    /** Lock for concurrency control. */
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /** Wrapped initial qry cursor. */
+    private PlatformQueryCursor initialQryCur;
+
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Context.
+     * @param ptr Pointer to native counterpart.
+     * @param hasFilter Whether filter exists.
+     * @param filter Filter.
+     */
+    public PlatformContinuousQueryImpl(PlatformContext platformCtx, long ptr, boolean hasFilter, Object filter) {
+        assert ptr != 0L;
+
+        this.platformCtx = platformCtx;
+        this.ptr = ptr;
+        this.hasFilter = hasFilter;
+        this.filter = filter;
+    }
+
+    /**
+     * Start query execution.
+     *
+     * @param cache Cache.
+     * @param loc Local flag.
+     * @param bufSize Buffer size.
+     * @param timeInterval Time interval.
+     * @param autoUnsubscribe Auto-unsubscribe flag.
+     * @param initialQry Initial query.
+     */
+    @SuppressWarnings("unchecked")
+    public void start(IgniteCacheProxy cache, boolean loc, int bufSize, long timeInterval, boolean autoUnsubscribe,
+        Query initialQry) throws IgniteCheckedException {
+        assert !loc || filter == null;
+
+        lock.writeLock().lock();
+
+        try {
+            try {
+                ContinuousQuery qry = new ContinuousQuery();
+
+                qry.setLocalListener(this);
+                qry.setRemoteFilter(this); // Filter must be set always for correct resource release.
+                qry.setPageSize(bufSize);
+                qry.setTimeInterval(timeInterval);
+                qry.setAutoUnsubscribe(autoUnsubscribe);
+                qry.setInitialQuery(initialQry);
+
+                cursor = cache.query(qry.setLocal(loc));
+
+                if (initialQry != null)
+                    initialQryCur = new PlatformQueryCursor(platformCtx, new QueryCursorEx<Cache.Entry>() {
+                        @Override public Iterator<Cache.Entry> iterator() {
+                            return cursor.iterator();
+                        }
+
+                        @Override public List<Cache.Entry> getAll() {
+                            return cursor.getAll();
+                        }
+
+                        @Override public void close() {
+                            // No-op: do not close whole continuous query when initial query cursor closes.
+                        }
+
+                        @Override public void getAll(Consumer<Cache.Entry> clo) throws IgniteCheckedException {
+                            for (Cache.Entry t : this)
+                                clo.consume(t);
+                        }
+
+                        @Override public List<GridQueryFieldMetadata> fieldsMeta() {
+                            return null;
+                        }
+                    }, initialQry.getPageSize() > 0 ? initialQry.getPageSize() : Query.DFLT_PAGE_SIZE);
+            }
+            catch (Exception e) {
+                try
+                {
+                    close0();
+                }
+                catch (Exception ignored)
+                {
+                    // Ignore
+                }
+
+                throw PlatformUtils.unwrapQueryException(e);
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void onUpdated(Iterable evts) throws CacheEntryListenerException {
+        lock.readLock().lock();
+
+        try {
+            if (ptr == 0)
+                throw new CacheEntryListenerException("Failed to notify listener because it has been closed.");
+
+            PlatformUtils.applyContinuousQueryEvents(platformCtx, ptr, evts);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException {
+        lock.readLock().lock();
+
+        try {
+            if (ptr == 0)
+                throw new CacheEntryListenerException("Failed to evaluate the filter because it has been closed.");
+
+            return !hasFilter || PlatformUtils.evaluateContinuousQueryEvent(platformCtx, ptr, evt);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onQueryUnregister() {
+        close();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        lock.writeLock().lock();
+
+        try {
+            close0();
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"UnusedDeclaration", "unchecked"})
+    @Override public PlatformTarget getInitialQueryCursor() {
+        return initialQryCur;
+    }
+
+    /**
+     * Internal close routine.
+     */
+    private void close0() {
+        if (ptr != 0) {
+            long ptr0 = ptr;
+
+            ptr = 0;
+
+            if (cursor != null)
+                cursor.close();
+
+            platformCtx.gateway().continuousQueryFilterRelease(ptr0);
+        }
+    }
+
+    /**
+     * Replacer for remote filter.
+     *
+     * @return Filter to be deployed on remote node.
+     * @throws ObjectStreamException If failed.
+     */
+    Object writeReplace() throws ObjectStreamException {
+        return filter == null ? null : platformCtx.createContinuousQueryFilter(filter);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java
new file mode 100644
index 0000000..71aa38c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Continuous query filter deployed on remote nodes.
+ */
+public class PlatformContinuousQueryRemoteFilter implements PlatformContinuousQueryFilter, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Lock for concurrency control. */
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /** Native filter in serialized form. */
+    private Object filter;
+
+    /** Grid hosting the filter. */
+    @IgniteInstanceResource
+    private transient Ignite grid;
+
+    /** Native platform pointer. */
+    private transient volatile long ptr;
+
+    /** Close flag. Once set, none requests to native platform is possible. */
+    private transient boolean closed;
+
+    /**
+     * {@link java.io.Externalizable} support.
+     */
+    public PlatformContinuousQueryRemoteFilter() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param filter Serialized native filter.
+     */
+    public PlatformContinuousQueryRemoteFilter(Object filter) {
+        assert filter != null;
+
+        this.filter = filter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException {
+        long ptr0 = ptr;
+
+        if (ptr0 == 0)
+            deploy();
+
+        lock.readLock().lock();
+
+        try {
+            if (closed)
+                throw new CacheEntryListenerException("Failed to evaluate the filter because it has been closed.");
+
+            PlatformContext platformCtx = PlatformUtils.platformContext(grid);
+
+            return PlatformUtils.evaluateContinuousQueryEvent(platformCtx, ptr, evt);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Deploy filter to native platform.
+     */
+    private void deploy() {
+        lock.writeLock().lock();
+
+        try {
+            // 1. Do not deploy if the filter has been closed concurrently.
+            if (closed)
+                throw new CacheEntryListenerException("Failed to deploy the filter because it has been closed.");
+
+            // 2. Deploy.
+            PlatformContext ctx = PlatformUtils.platformContext(grid);
+
+            try (PlatformMemory mem = ctx.memory().allocate()) {
+                PlatformOutputStream out = mem.output();
+
+                PortableRawWriterEx writer = ctx.writer(out);
+
+                writer.writeObject(filter);
+
+                out.synchronize();
+
+                ptr = ctx.gateway().continuousQueryFilterCreate(mem.pointer());
+            }
+            catch (Exception e) {
+                // 3. Close in case of failure.
+                close();
+
+                throw new CacheEntryListenerException("Failed to deploy the filter.", e);
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onQueryUnregister() {
+        lock.writeLock().lock();
+
+        try {
+            close();
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Close the filter.
+     */
+    private void close() {
+        if (!closed) {
+            try {
+                if (ptr != 0) {
+                    try {
+                        PlatformUtils.platformContext(grid).gateway().continuousQueryFilterRelease(ptr);
+                    }
+                    finally {
+                        // Nullify the pointer in any case.
+                        ptr = 0;
+                    }
+                }
+            }
+            finally {
+                closed = true;
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        filter = in.readObject();
+
+        assert filter != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(PlatformContinuousQueryRemoteFilter.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java
new file mode 100644
index 0000000..44a4f14
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import java.util.List;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+
+/**
+ * Interop cursor for fields query.
+ */
+public class PlatformFieldsQueryCursor extends PlatformAbstractQueryCursor<List<?>> {
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Platform context.
+     * @param cursor Backing cursor.
+     * @param batchSize Batch size.
+     */
+    public PlatformFieldsQueryCursor(PlatformContext platformCtx, QueryCursorEx<List<?>> cursor, int batchSize) {
+        super(platformCtx, cursor, batchSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void write(PortableRawWriterEx writer, List vals) {
+        assert vals != null;
+
+        writer.writeInt(vals.size());
+
+        for (Object val : vals)
+            writer.writeObjectDetached(val);
+    }
+}
\ No newline at end of file


Mime
View raw message