ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [12/14] ignite git commit: IGNITE-1513: Merged Java to core module.
Date Fri, 18 Sep 2015 10:04:16 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java
new file mode 100644
index 0000000..410e4de
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import javax.cache.Cache;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+
+/**
+ * Interop cursor for regular queries.
+ */
+public class PlatformQueryCursor extends PlatformAbstractQueryCursor<Cache.Entry> {
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Context.
+     * @param cursor Backing cursor.
+     * @param batchSize Batch size.
+     */
+    public PlatformQueryCursor(PlatformContext platformCtx, QueryCursorEx<Cache.Entry> cursor, int batchSize) {
+        super(platformCtx, cursor, batchSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void write(PortableRawWriterEx writer, Cache.Entry val) {
+        writer.writeObjectDetached(val.getKey());
+        writer.writeObjectDetached(val.getValue());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/store/PlatformCacheStoreCallback.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/store/PlatformCacheStoreCallback.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/store/PlatformCacheStoreCallback.java
new file mode 100644
index 0000000..a741f0f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/store/PlatformCacheStoreCallback.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.store;
+
+import org.apache.ignite.internal.portable.PortableRawReaderEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+
+/**
+ * Platform cache store callback.
+ */
+public abstract class PlatformCacheStoreCallback {
+    /** Context. */
+    protected final PlatformContext ctx;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Context.
+     */
+    protected PlatformCacheStoreCallback(PlatformContext ctx) {
+        this.ctx = ctx;
+    }
+
+    /**
+     * Invoke the callback.
+     *
+     * @param memPtr Memory pointer.
+     */
+    public void invoke(long memPtr) {
+        if (memPtr > 0) {
+            try (PlatformMemory mem = ctx.memory().get(memPtr)) {
+                PortableRawReaderEx reader = ctx.reader(mem);
+
+                invoke0(reader);
+            }
+        }
+    }
+
+    /**
+     * Internal invoke routine.
+     *
+     * @param reader Reader.
+     */
+    protected abstract void invoke0(PortableRawReaderEx reader);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
new file mode 100644
index 0000000..a1c8516
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cluster;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCluster;
+import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.cluster.ClusterGroupEx;
+import org.apache.ignite.internal.portable.PortableRawReaderEx;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+
+/**
+ * Interop projection.
+ */
+@SuppressWarnings({"UnusedDeclaration"})
+public class PlatformClusterGroup extends PlatformAbstractTarget {
+    /** */
+    private static final int OP_ALL_METADATA = 1;
+
+    /** */
+    private static final int OP_FOR_ATTRIBUTE = 2;
+
+    /** */
+    private static final int OP_FOR_CACHE = 3;
+
+    /** */
+    private static final int OP_FOR_CLIENT = 4;
+
+    /** */
+    private static final int OP_FOR_DATA = 5;
+
+    /** */
+    private static final int OP_FOR_HOST = 6;
+
+    /** */
+    private static final int OP_FOR_NODE_IDS = 7;
+
+    /** */
+    private static final int OP_METADATA = 8;
+
+    /** */
+    private static final int OP_METRICS = 9;
+
+    /** */
+    private static final int OP_METRICS_FILTERED = 10;
+
+    /** */
+    private static final int OP_NODE_METRICS = 11;
+
+    /** */
+    private static final int OP_NODES = 12;
+
+    /** */
+    private static final int OP_PING_NODE = 13;
+
+    /** */
+    private static final int OP_TOPOLOGY = 14;
+
+    /** Projection. */
+    private final ClusterGroupEx prj;
+
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Context.
+     * @param prj Projection.
+     */
+    public PlatformClusterGroup(PlatformContext platformCtx, ClusterGroupEx prj) {
+        super(platformCtx);
+
+        this.prj = prj;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override protected void processOutStream(int type, PortableRawWriterEx writer) throws IgniteCheckedException {
+        switch (type) {
+            case OP_METRICS:
+                platformCtx.writeClusterMetrics(writer, prj.metrics());
+
+                break;
+
+            case OP_ALL_METADATA:
+                platformCtx.writeAllMetadata(writer);
+
+                break;
+
+            default:
+                super.processOutStream(type, writer);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"ConstantConditions", "deprecation"})
+    @Override protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer)
+        throws IgniteCheckedException {
+        switch (type) {
+            case OP_METRICS_FILTERED: {
+                Collection<UUID> ids = PlatformUtils.readCollection(reader);
+
+                platformCtx.writeClusterMetrics(writer, prj.forNodeIds(ids).metrics());
+
+                break;
+            }
+
+            case OP_NODES: {
+                long oldTopVer = reader.readLong();
+
+                long curTopVer = platformCtx.kernalContext().discovery().topologyVersion();
+
+                if (curTopVer > oldTopVer) {
+                    writer.writeBoolean(true);
+
+                    writer.writeLong(curTopVer);
+
+                    // At this moment topology version might have advanced, and due to this race
+                    // we return outdated top ver to the callee. But this race is benign, the only
+                    // possible side effect is that the user will re-request nodes and we will return
+                    // the same set of nodes but with more recent topology version.
+                    Collection<ClusterNode> nodes = prj.nodes();
+
+                    platformCtx.writeNodes(writer, nodes);
+                }
+                else
+                    // No discovery events since last invocation.
+                    writer.writeBoolean(false);
+
+                break;
+            }
+
+            case OP_NODE_METRICS: {
+                UUID nodeId = reader.readUuid();
+
+                long lastUpdateTime = reader.readLong();
+
+                // Ask discovery because node might have been filtered out of current projection.
+                ClusterNode node = platformCtx.kernalContext().discovery().node(nodeId);
+
+                ClusterMetrics metrics = null;
+
+                if (node != null) {
+                    ClusterMetrics metrics0 = node.metrics();
+
+                    long triggerTime = lastUpdateTime + platformCtx.kernalContext().config().getMetricsUpdateFrequency();
+
+                    metrics = metrics0.getLastUpdateTime() > triggerTime ? metrics0 : null;
+                }
+
+                platformCtx.writeClusterMetrics(writer, metrics);
+
+                break;
+            }
+
+            case OP_METADATA: {
+                int typeId = reader.readInt();
+
+                platformCtx.writeMetadata(writer, typeId);
+
+                break;
+            }
+
+            case OP_TOPOLOGY: {
+                long topVer = reader.readLong();
+
+                platformCtx.writeNodes(writer, topology(topVer));
+
+                break;
+            }
+
+            default:
+                super.processInStreamOutStream(type, reader, writer);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+        switch (type) {
+            case OP_PING_NODE:
+                return pingNode(reader.readUuid()) ? TRUE : FALSE;
+
+            default:
+                return super.processInStreamOutLong(type, reader);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Object processInStreamOutObject(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+        switch (type) {
+            case OP_FOR_NODE_IDS: {
+                Collection<UUID> ids = PlatformUtils.readCollection(reader);
+
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forNodeIds(ids));
+            }
+
+            case OP_FOR_ATTRIBUTE:
+                return new PlatformClusterGroup(platformCtx,
+                    (ClusterGroupEx)prj.forAttribute(reader.readString(), reader.readString()));
+
+            case OP_FOR_CACHE: {
+                String cacheName = reader.readString();
+
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forCacheNodes(cacheName));
+            }
+
+            case OP_FOR_CLIENT: {
+                String cacheName = reader.readString();
+
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forClientNodes(cacheName));
+            }
+
+            case OP_FOR_DATA: {
+                String cacheName = reader.readString();
+
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forDataNodes(cacheName));
+            }
+
+            case OP_FOR_HOST: {
+                UUID nodeId = reader.readUuid();
+
+                ClusterNode node = prj.node(nodeId);
+
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx) prj.forHost(node));
+            }
+
+            default:
+                return super.processInStreamOutObject(type, reader);
+        }
+    }
+
+    /**
+     * @param exclude Projection to exclude.
+     * @return New projection.
+     */
+    public PlatformClusterGroup forOthers(PlatformClusterGroup exclude) {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forOthers(exclude.prj));
+    }
+
+    /**
+     * @return New projection.
+     */
+    public PlatformClusterGroup forRemotes() {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRemotes());
+    }
+
+    /**
+     * @return New projection.
+     */
+    public PlatformClusterGroup forDaemons() {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forDaemons());
+    }
+
+    /**
+     * @return New projection.
+     */
+    public PlatformClusterGroup forRandom() {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRandom());
+    }
+
+    /**
+     * @return New projection.
+     */
+    public PlatformClusterGroup forOldest() {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forOldest());
+    }
+
+    /**
+     * @return New projection.
+     */
+    public PlatformClusterGroup forYoungest() {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forYoungest());
+    }
+
+    /**
+     * @return Projection.
+     */
+    public ClusterGroupEx projection() {
+        return prj;
+    }
+
+    /**
+     * Resets local I/O, job, and task execution metrics.
+     */
+    public void resetMetrics() {
+        assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
+
+        ((IgniteCluster)prj).resetMetrics();
+    }
+
+    /**
+     * Pings a remote node.
+     */
+    private boolean pingNode(UUID nodeId) {
+        assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
+
+        return ((IgniteCluster)prj).pingNode(nodeId);
+    }
+
+    /**
+     * Gets a topology by version. Returns {@code null} if topology history storage doesn't contain
+     * specified topology version (history currently keeps last {@code 1000} snapshots).
+     *
+     * @param topVer Topology version.
+     * @return Collection of grid nodes which represented by specified topology version,
+     * if it is present in history storage, {@code null} otherwise.
+     * @throws UnsupportedOperationException If underlying SPI implementation does not support
+     *      topology history. Currently only {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}
+     *      supports topology history.
+     */
+    private Collection<ClusterNode> topology(long topVer) {
+        assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
+
+        return ((IgniteCluster)prj).topology(topVer);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java
new file mode 100644
index 0000000..5ba9a85
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cluster;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * Interop cluster node filter.
+ */
+public class PlatformClusterNodeFilterImpl extends PlatformAbstractPredicate implements PlatformClusterNodeFilter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * {@link java.io.Externalizable} support.
+     */
+    public PlatformClusterNodeFilterImpl() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param pred .Net portable predicate.
+     * @param ctx Kernal context.
+     */
+    public PlatformClusterNodeFilterImpl(Object pred, PlatformContext ctx) {
+        super(pred, 0, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(ClusterNode clusterNode) {
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObject(pred);
+            ctx.writeNode(writer, clusterNode);
+
+            out.synchronize();
+
+            return ctx.gateway().clusterNodeFilterApply(mem.pointer()) != 0;
+        }
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    @IgniteInstanceResource
+    public void setIgniteInstance(Ignite ignite) {
+        ctx = PlatformUtils.platformContext(ignite);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
new file mode 100644
index 0000000..bf9d9e4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import java.io.Externalizable;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformProcessor;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Base interop job.
+ */
+public abstract class PlatformAbstractJob implements PlatformJob, Externalizable {
+    /** Marker object denoting the job execution result is stored in native platform. */
+    static final Object LOC_JOB_RES = new Object();
+
+    /** Grid name. */
+    @IgniteInstanceResource
+    protected transient Ignite ignite;
+
+    /** Parent task; present only on local job instance. */
+    protected transient PlatformAbstractTask task;
+
+    /** Pointer to job in the native platform. */
+    protected transient long ptr;
+
+    /** Job. */
+    protected Object job;
+
+    /**
+     * {@link java.io.Externalizable} support.
+     */
+    protected PlatformAbstractJob() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param task Parent task.
+     * @param ptr Pointer.
+     * @param job Job.
+     */
+    protected PlatformAbstractJob(PlatformAbstractTask task, long ptr, Object job) {
+        this.task = task;
+        this.ptr = ptr;
+        this.job = job;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Object execute() {
+        try {
+            PlatformProcessor interopProc = PlatformUtils.platformProcessor(ignite);
+
+            interopProc.awaitStart();
+
+            return execute0(interopProc.context());
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /**
+     * Internal job execution routine.
+     *
+     * @param ctx Interop processor.
+     * @return Result.
+     * @throws org.apache.ignite.IgniteCheckedException If failed.
+     */
+    protected abstract Object execute0(PlatformContext ctx) throws IgniteCheckedException;
+
+    /**
+     * Create job in native platform if needed.
+     *
+     * @param ctx Context.
+     * @return {@code True} if job was created, {@code false} if this is local job and creation is not necessary.
+     * @throws org.apache.ignite.IgniteCheckedException If failed.
+     */
+    protected boolean createJob(PlatformContext ctx) throws IgniteCheckedException {
+        if (ptr == 0) {
+            try (PlatformMemory mem = ctx.memory().allocate()) {
+                PlatformOutputStream out = mem.output();
+
+                PortableRawWriterEx writer = ctx.writer(out);
+
+                writer.writeObject(job);
+
+                out.synchronize();
+
+                ptr = ctx.gateway().computeJobCreate(mem.pointer());
+            }
+
+            return true;
+        }
+        else
+            return false;
+    }
+
+    /**
+     * Run local job.
+     *
+     * @param ctx Context.
+     * @param cancel Cancel flag.
+     * @return Result.
+     */
+    protected Object runLocal(PlatformContext ctx, boolean cancel) {
+        // Local job, must execute it with respect to possible concurrent task completion.
+        if (task.onJobLock()) {
+            try {
+                ctx.gateway().computeJobExecute(ptr, cancel ? 1 : 0, 0);
+
+                return LOC_JOB_RES;
+            }
+            finally {
+                task.onJobUnlock();
+            }
+        }
+        else
+            // Task has completed concurrently, no need to run the job.
+            return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long pointer() {
+        return ptr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object job() {
+        return job;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
new file mode 100644
index 0000000..b17dd97
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformNativeException;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.internal.util.typedef.X;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Base class for all interop tasks.
+ */
+public abstract class PlatformAbstractTask implements ComputeTask<Object, Void> {
+    /** Platform context. */
+    protected final PlatformContext ctx;
+
+    /** Pointer to the task in the native platform. */
+    protected final long taskPtr;
+
+    /** Lock for safe access to native pointers. */
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /** Done flag. */
+    protected boolean done;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Platform context.
+     * @param taskPtr Task pointer.
+     */
+    protected PlatformAbstractTask(PlatformContext ctx, long taskPtr) {
+        this.ctx = ctx;
+        this.taskPtr = taskPtr;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+        assert rcvd.isEmpty() : "Should not cache result in Java for interop task";
+
+        int plc;
+
+        lock.readLock().lock();
+
+        try {
+            assert !done;
+
+            PlatformAbstractJob job = res.getJob();
+
+            assert job.pointer() != 0;
+
+            Object res0bj = res.getData();
+
+            if (res0bj == PlatformAbstractJob.LOC_JOB_RES)
+                // Processing local job execution result.
+                plc = ctx.gateway().computeTaskJobResult(taskPtr, job.pointer(), 0);
+            else {
+                // Processing remote job execution result or exception.
+                try (PlatformMemory mem = ctx.memory().allocate()) {
+                    PlatformOutputStream out = mem.output();
+
+                    PortableRawWriterEx writer = ctx.writer(out);
+
+                    writer.writeUuid(res.getNode().id());
+                    writer.writeBoolean(res.isCancelled());
+
+                    IgniteException err = res.getException();
+
+                    PlatformUtils.writeInvocationResult(writer, res0bj, err);
+
+                    out.synchronize();
+
+                    plc = ctx.gateway().computeTaskJobResult(taskPtr, job.pointer(), mem.pointer());
+                }
+            }
+
+            ComputeJobResultPolicy plc0 = ComputeJobResultPolicy.fromOrdinal((byte) plc);
+
+            assert plc0 != null : plc;
+
+            return plc0;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Void reduce(List<ComputeJobResult> results) {
+        assert results.isEmpty() : "Should not cache result in java for interop task";
+
+        lock.readLock().lock();
+
+        try {
+            assert !done;
+
+            ctx.gateway().computeTaskReduce(taskPtr);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+
+        return null;
+    }
+
+    /**
+     * Callback invoked when task future is completed and all resources could be safely cleaned up.
+     *
+     * @param e If failed.
+     */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    public void onDone(Exception e) {
+        lock.writeLock().lock();
+
+        try {
+            assert !done;
+
+            if (e == null)
+                // Normal completion.
+                ctx.gateway().computeTaskComplete(taskPtr, 0);
+            else {
+                PlatformNativeException e0 = X.cause(e, PlatformNativeException.class);
+
+                try (PlatformMemory mem = ctx.memory().allocate()) {
+                    PlatformOutputStream out = mem.output();
+
+                    PortableRawWriterEx writer = ctx.writer(out);
+
+                    if (e0 == null) {
+                        writer.writeBoolean(false);
+                        writer.writeString(e.getClass().getName());
+                        writer.writeString(e.getMessage());
+                    }
+                    else {
+                        writer.writeBoolean(true);
+                        writer.writeObject(e0.cause());
+                    }
+
+                    out.synchronize();
+
+                    ctx.gateway().computeTaskComplete(taskPtr, mem.pointer());
+                }
+            }
+        }
+        finally {
+            // Done flag is set irrespective of any exceptions.
+            done = true;
+
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Callback invoked by job when it wants to lock the task.
+     *
+     * @return {@code} True if task is not completed yet, {@code false} otherwise.
+     */
+    @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+    public boolean onJobLock() {
+        lock.readLock().lock();
+
+        if (done) {
+            lock.readLock().unlock();
+
+            return false;
+        }
+        else
+            return true;
+    }
+
+    /**
+     * Callback invoked by job when task can be unlocked.
+     */
+    public void onJobUnlock() {
+        assert !done;
+
+        lock.readLock().unlock();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java
new file mode 100644
index 0000000..5570586
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeLoadBalancer;
+import org.apache.ignite.compute.ComputeTaskNoResultCache;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.resources.LoadBalancerResource;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Interop multi-closure task with node balancing.
+ */
+@ComputeTaskNoResultCache
+public class PlatformBalancingMultiClosureTask extends PlatformAbstractTask {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Jobs. */
+    private Collection<PlatformJob> jobs;
+
+    /** Load balancer. */
+    @SuppressWarnings("UnusedDeclaration")
+    @LoadBalancerResource
+    private ComputeLoadBalancer lb;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Platform context.
+     * @param taskPtr Task pointer.
+     */
+    public PlatformBalancingMultiClosureTask(PlatformContext ctx, long taskPtr) {
+        super(ctx, taskPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable Object arg) {
+        assert !F.isEmpty(jobs) : "Jobs emptiness must be checked in native platform.";
+
+        if (!F.isEmpty(subgrid)) {
+            Map<ComputeJob, ClusterNode> map = new HashMap<>(jobs.size(), 1);
+
+            for (PlatformJob job : jobs)
+                map.put(job, lb.getBalancedNode(job, null));
+
+            return map;
+        }
+        else
+            return Collections.emptyMap();
+    }
+
+    /**
+     * @param jobs Jobs.
+     */
+    public void jobs(Collection<PlatformJob> jobs) {
+        this.jobs = jobs;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java
new file mode 100644
index 0000000..a168144
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureAffinityTask.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeTaskNoResultCache;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Interop single-closure task with node balancing.
+ */
+@ComputeTaskNoResultCache
+public class PlatformBalancingSingleClosureAffinityTask extends PlatformAbstractTask {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Job. */
+    private PlatformJob job;
+
+    /** Node, according to affinity. */
+    private ClusterNode node;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Platform context.
+     * @param taskPtr Task pointer.
+     */
+    public PlatformBalancingSingleClosureAffinityTask(PlatformContext ctx, long taskPtr) {
+        super(ctx, taskPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable Object arg) {
+        assert job != null : "Job null-check must be performed in native platform.";
+
+        return Collections.singletonMap(job, node);
+    }
+
+    /**
+     * @param job Job.
+     */
+    public void job(PlatformJob job) {
+        this.job = job;
+    }
+
+    /**
+     * Init affinity.
+     *
+     * @param cacheName Cache name.
+     * @param affKey Affinity key.
+     * @param ctx Kernal context.
+     */
+    public void affinity(String cacheName, Object affKey, GridKernalContext ctx) {
+        try {
+            final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
+
+            node = ctx.affinity().mapKeyToNode(cacheName, affKey0);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java
new file mode 100644
index 0000000..3f1d66a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeLoadBalancer;
+import org.apache.ignite.compute.ComputeTaskNoResultCache;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.resources.LoadBalancerResource;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Interop single-closure task with node balancing.
+ */
+@ComputeTaskNoResultCache
+public class PlatformBalancingSingleClosureTask extends PlatformAbstractTask {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Jobs. */
+    private PlatformJob job;
+
+    /** Load balancer. */
+    @SuppressWarnings("UnusedDeclaration")
+    @LoadBalancerResource
+    private ComputeLoadBalancer lb;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Platform context.
+     * @param taskPtr Task pointer.
+     */
+    public PlatformBalancingSingleClosureTask(PlatformContext ctx, long taskPtr) {
+        super(ctx, taskPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable Object arg) {
+        assert job != null : "Job null-check must be performed in native platform.";
+
+        if (!F.isEmpty(subgrid)) {
+            Map<ComputeJob, ClusterNode> map = new HashMap<>(1, 1);
+
+            map.put(job, lb.getBalancedNode(job, null));
+
+            return map;
+        }
+        else
+            return Collections.emptyMap();
+    }
+
+    /**
+     * @param job Job.
+     */
+    public void job(PlatformJob job) {
+        this.job = job;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
new file mode 100644
index 0000000..d2bd0ac
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeTaskNoResultCache;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Interop multi-closure task with broadcast semantics.
+ */
+@ComputeTaskNoResultCache
+public class PlatformBroadcastingMultiClosureTask extends PlatformAbstractTask {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Jobs. */
+    private Collection<PlatformJob> jobs;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Platform context.
+     * @param taskPtr Task pointer.
+     */
+    public PlatformBroadcastingMultiClosureTask(PlatformContext ctx, long taskPtr) {
+        super(ctx, taskPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable Object arg) {
+        assert !F.isEmpty(jobs) : "Jobs emptiness must be checked in native platform.";
+
+        if (!F.isEmpty(subgrid)) {
+            Map<ComputeJob, ClusterNode> map = new HashMap<>(jobs.size() * subgrid.size(), 1);
+
+            for (PlatformJob job : jobs) {
+                boolean first = true;
+
+                for (ClusterNode node : subgrid) {
+                    if (first) {
+                        map.put(job, node);
+
+                        first = false;
+                    }
+                    else
+                        map.put(ctx.createClosureJob(this, job.pointer(), job.job()), node);
+                }
+            }
+
+            return map;
+        }
+        else
+            return Collections.emptyMap();
+    }
+
+    /**
+     * @param jobs Jobs.
+     */
+    public void jobs(Collection<PlatformJob> jobs) {
+        this.jobs = jobs;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
new file mode 100644
index 0000000..0736988
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeTaskNoResultCache;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Interop single-closure task with broadcast semantics.
+ */
+@ComputeTaskNoResultCache
+public class PlatformBroadcastingSingleClosureTask extends PlatformAbstractTask {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private PlatformJob job;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Platform context.
+     * @param taskPtr Task pointer.
+     */
+    public PlatformBroadcastingSingleClosureTask(PlatformContext ctx, long taskPtr) {
+        super(ctx, taskPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable Object arg) {
+        assert job != null : "Job null-check must be performed in native platform.";
+
+        if (!F.isEmpty(subgrid)) {
+            Map<ComputeJob, ClusterNode> map = new HashMap<>(subgrid.size(), 1);
+
+            boolean first = true;
+
+            for (ClusterNode node : subgrid) {
+                if (first) {
+                    map.put(job, node);
+
+                    first = false;
+                }
+                else
+                    map.put(ctx.createClosureJob(this, job.pointer(), job.job()), node);
+            }
+
+            return map;
+        }
+        else
+            return Collections.emptyMap();
+    }
+
+    /**
+     * @param job Job.
+     */
+    public void job(PlatformJob job) {
+        this.job = job;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
new file mode 100644
index 0000000..9bd7d60
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.portable.PortableRawReaderEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Light-weight interop job. Comparing to regular job, this guy has simpler logic because we should not
+ * bother with delayed serialization and cancellation.
+ */
+public class PlatformClosureJob extends PlatformAbstractJob {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * {@link java.io.Externalizable} support.
+     */
+    public PlatformClosureJob() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param task Parent task.
+     * @param ptr Job pointer.
+     * @param job Job.
+     */
+    public PlatformClosureJob(PlatformAbstractTask task, long ptr, Object job) {
+        super(task, ptr, job);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Object execute0(PlatformContext ctx) throws IgniteCheckedException {
+        if (task == null) {
+            // Remote job execution.
+            assert ptr == 0;
+
+            createJob(ctx);
+
+            try (PlatformMemory mem = ctx.memory().allocate()) {
+                PlatformInputStream in = mem.input();
+
+                ctx.gateway().computeJobExecute(ptr, 0, mem.pointer());
+
+                in.synchronize();
+
+                PortableRawReaderEx reader = ctx.reader(in);
+
+                return PlatformUtils.readInvocationResult(ctx, reader);
+            }
+            finally {
+                ctx.gateway().computeJobDestroy(ptr);
+            }
+        }
+        else {
+            // Local job execution.
+            assert ptr != 0;
+
+            return runLocal(ctx, false);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        assert job != null;
+
+        out.writeObject(job);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        job = in.readObject();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
new file mode 100644
index 0000000..638b4b1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.internal.IgniteComputeImpl;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.portable.PortableObjectImpl;
+import org.apache.ignite.internal.portable.PortableRawReaderEx;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.portable.PortableObject;
+
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
+
+/**
+ * Interop compute.
+ */
+@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored", "UnusedDeclaration"})
+public class PlatformCompute extends PlatformAbstractTarget {
+    /** */
+    private static final int OP_AFFINITY = 1;
+
+    /** */
+    private static final int OP_BROADCAST = 2;
+
+    /** */
+    private static final int OP_EXEC = 3;
+
+    /** */
+    private static final int OP_EXEC_ASYNC = 4;
+
+    /** */
+    private static final int OP_UNICAST = 5;
+
+    /** Compute instance. */
+    private final IgniteComputeImpl compute;
+
+    /** Future for previous asynchronous operation. */
+    protected ThreadLocal<IgniteFuture<?>> curFut = new ThreadLocal<>();
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Context.
+     * @param compute Compute instance.
+     */
+    public PlatformCompute(PlatformContext platformCtx, IgniteComputeImpl compute) {
+        super(platformCtx);
+
+        this.compute = compute;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+        switch (type) {
+            case OP_UNICAST:
+                processClosures(reader.readLong(), reader, false, false);
+
+                return TRUE;
+
+            case OP_BROADCAST:
+                processClosures(reader.readLong(), reader, true, false);
+
+                return TRUE;
+
+            case OP_AFFINITY:
+                processClosures(reader.readLong(), reader, false, true);
+
+                return TRUE;
+
+            default:
+                return super.processInStreamOutLong(type, reader);
+        }
+    }
+
+    /**
+     * Process closure execution request.
+     *
+     * @param taskPtr Task pointer.
+     * @param reader Reader.
+     * @param broadcast broadcast flag.
+     */
+    private void processClosures(long taskPtr, PortableRawReaderEx reader, boolean broadcast, boolean affinity) {
+        PlatformAbstractTask task;
+
+        int size = reader.readInt();
+
+        if (size == 1) {
+            if (broadcast) {
+                PlatformBroadcastingSingleClosureTask task0 =
+                    new PlatformBroadcastingSingleClosureTask(platformCtx, taskPtr);
+
+                task0.job(nextClosureJob(task0, reader));
+
+                task = task0;
+            }
+            else if (affinity) {
+                PlatformBalancingSingleClosureAffinityTask task0 =
+                    new PlatformBalancingSingleClosureAffinityTask(platformCtx, taskPtr);
+
+                task0.job(nextClosureJob(task0, reader));
+
+                task0.affinity(reader.readString(), reader.readObjectDetached(), platformCtx.kernalContext());
+
+                task = task0;
+            }
+            else {
+                PlatformBalancingSingleClosureTask task0 = new PlatformBalancingSingleClosureTask(platformCtx, taskPtr);
+
+                task0.job(nextClosureJob(task0, reader));
+
+                task = task0;
+            }
+        }
+        else {
+            if (broadcast)
+                task = new PlatformBroadcastingMultiClosureTask(platformCtx, taskPtr);
+            else
+                task = new PlatformBalancingMultiClosureTask(platformCtx, taskPtr);
+
+            Collection<PlatformJob> jobs = new ArrayList<>(size);
+
+            for (int i = 0; i < size; i++)
+                jobs.add(nextClosureJob(task, reader));
+
+            if (broadcast)
+                ((PlatformBroadcastingMultiClosureTask)task).jobs(jobs);
+            else
+                ((PlatformBalancingMultiClosureTask)task).jobs(jobs);
+        }
+
+        platformCtx.kernalContext().task().setThreadContext(TC_SUBGRID, compute.clusterGroup().nodes());
+
+        executeNative0(task);
+    }
+
+    /**
+     * Read the next closure job from the reader.
+     *
+     * @param task Task.
+     * @param reader Reader.
+     * @return Closure job.
+     */
+    private PlatformJob nextClosureJob(PlatformAbstractTask task, PortableRawReaderEx reader) {
+        return platformCtx.createClosureJob(task, reader.readLong(), reader.readObjectDetached());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer)
+        throws IgniteCheckedException {
+        switch (type) {
+            case OP_EXEC:
+                writer.writeObjectDetached(executeJavaTask(reader, false));
+
+                break;
+
+            case OP_EXEC_ASYNC:
+                writer.writeObjectDetached(executeJavaTask(reader, true));
+
+                break;
+
+            default:
+                super.processInStreamOutStream(type, reader, writer);
+        }
+    }
+
+    /**
+     * Execute native full-fledged task.
+     *
+     * @param taskPtr Pointer to the task.
+     * @param topVer Topology version.
+     */
+    public void executeNative(long taskPtr, long topVer) {
+        final PlatformFullTask task = new PlatformFullTask(platformCtx, compute, taskPtr, topVer);
+
+        executeNative0(task);
+    }
+
+    /**
+     * Set "withTimeout" state.
+     *
+     * @param timeout Timeout (milliseconds).
+     */
+    public void withTimeout(long timeout) {
+        compute.withTimeout(timeout);
+    }
+
+    /**
+     * Set "withNoFailover" state.
+     */
+    public void withNoFailover() {
+        compute.withNoFailover();
+    }
+
+    /** <inheritDoc /> */
+    @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
+        IgniteFuture<?> fut = curFut.get();
+
+        if (fut == null)
+            throw new IllegalStateException("Asynchronous operation not started.");
+
+        return fut;
+    }
+
+    /**
+     * Execute task.
+     *
+     * @param task Task.
+     */
+    private void executeNative0(final PlatformAbstractTask task) {
+        IgniteInternalFuture fut = compute.executeAsync(task, null);
+
+        fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
+            private static final long serialVersionUID = 0L;
+
+            @Override public void apply(IgniteInternalFuture fut) {
+                try {
+                    fut.get();
+
+                    task.onDone(null);
+                }
+                catch (IgniteCheckedException e) {
+                    task.onDone(e);
+                }
+            }
+        });
+    }
+
+    /**
+     * Execute task taking arguments from the given reader.
+     *
+     * @param reader Reader.
+     * @return Task result.
+     */
+    protected Object executeJavaTask(PortableRawReaderEx reader, boolean async) {
+        String taskName = reader.readString();
+        boolean keepPortable = reader.readBoolean();
+        Object arg = reader.readObjectDetached();
+
+        Collection<UUID> nodeIds = readNodeIds(reader);
+
+        IgniteCompute compute0 = computeForTask(nodeIds);
+
+        if (async)
+            compute0 = compute0.withAsync();
+
+        if (!keepPortable && arg instanceof PortableObjectImpl)
+            arg = ((PortableObject)arg).deserialize();
+
+        Object res = compute0.execute(taskName, arg);
+
+        if (async) {
+            curFut.set(compute0.future().chain(new C1<IgniteFuture, Object>() {
+                private static final long serialVersionUID = 0L;
+
+                @Override public Object apply(IgniteFuture fut) {
+                    return toPortable(fut.get());
+                }
+            }));
+
+            return null;
+        }
+        else
+            return toPortable(res);
+    }
+
+    /**
+     * Convert object to portable form.
+     *
+     * @param src Source object.
+     * @return Result.
+     */
+    private Object toPortable(Object src) {
+        return platformCtx.kernalContext().grid().portables().toPortable(src);
+    }
+
+    /**
+     * Read node IDs.
+     *
+     * @param reader Reader.
+     * @return Node IDs.
+     */
+    protected Collection<UUID> readNodeIds(PortableRawReaderEx reader) {
+        if (reader.readBoolean()) {
+            int len = reader.readInt();
+
+            List<UUID> res = new ArrayList<>(len);
+
+            for (int i = 0; i < len; i++)
+                res.add(reader.readUuid());
+
+            return res;
+        }
+        else
+            return null;
+    }
+
+    /**
+     * Get compute object for the given node IDs.
+     *
+     * @param nodeIds Node IDs.
+     * @return Compute object.
+     */
+    protected IgniteCompute computeForTask(Collection<UUID> nodeIds) {
+        return nodeIds == null ? compute :
+            platformCtx.kernalContext().grid().compute(compute.clusterGroup().forNodeIds(nodeIds));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
new file mode 100644
index 0000000..cfed735
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.portable.PortableRawReaderEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformProcessor;
+import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wrapper around job created in native platform.
+ * <p>
+ * If the job is expected to be executed locally, it contains only pointer to the corresponding entity in the native
+ * platform. In case of topology change or failover, job is serialized on demand.
+ * <p>
+ * If we know in advance that the job is to be executed on remote node, then it is serialized into byte array right
+ * away.
+ * <p>
+ * This class is not thread safe.
+ */
+@SuppressWarnings({"FieldCanBeLocal"})
+public class PlatformFullJob extends PlatformAbstractJob {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Job is initialized. */
+    private static final byte STATE_INIT = 0;
+
+    /** Job is running. */
+    private static final byte STATE_RUNNING = 1;
+
+    /** Job execution completed. */
+    private static final byte STATE_COMPLETED = 2;
+
+    /** Job cancelled. */
+    private static final byte STATE_CANCELLED = 3;
+
+    /** Platform context. */
+    private transient PlatformContext ctx;
+
+    /** Serialized job. */
+    private transient byte state;
+
+    /**
+     * {@link java.io.Externalizable} support.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public PlatformFullJob() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Platform context.
+     * @param task Parent task.
+     * @param ptr Job pointer.
+     * @param job Job.
+     */
+    public PlatformFullJob(PlatformContext ctx, PlatformAbstractTask task, long ptr, Object job) {
+        super(task, ptr, job);
+
+        this.ctx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Object execute0(PlatformContext ctx) throws IgniteCheckedException {
+        boolean cancel = false;
+
+        synchronized (this) {
+            // 1. Create job if necessary.
+            if (task == null) {
+                assert ptr == 0;
+
+                createJob(ctx);
+            }
+            else
+                assert ptr != 0;
+
+            // 2. Set correct state.
+            if (state == STATE_INIT)
+                state = STATE_RUNNING;
+            else {
+                assert state == STATE_CANCELLED;
+
+                cancel = true;
+            }
+        }
+
+        try {
+            if (task != null)
+                return runLocal(ctx, cancel);
+            else {
+                try (PlatformMemory mem = ctx.memory().allocate()) {
+                    PlatformInputStream in = mem.input();
+
+                    ctx.gateway().computeJobExecute(ptr, cancel ? 1 : 0, mem.pointer());
+
+                    in.synchronize();
+
+                    PortableRawReaderEx reader = ctx.reader(in);
+
+                    return PlatformUtils.readInvocationResult(ctx, reader);
+                }
+            }
+        }
+        finally {
+            synchronized (this) {
+                if (task == null) {
+                    assert ptr != 0;
+
+                    ctx.gateway().computeJobDestroy(ptr);
+                }
+
+                if (state == STATE_RUNNING)
+                    state = STATE_COMPLETED;
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        PlatformProcessor proc = PlatformUtils.platformProcessor(ignite);
+
+        synchronized (this) {
+            if (state == STATE_INIT)
+                state = STATE_CANCELLED;
+            else if (state == STATE_RUNNING) {
+                assert ptr != 0;
+
+                try {
+                    proc.context().gateway().computeJobCancel(ptr);
+                }
+                finally {
+                    state = STATE_CANCELLED;
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        if (job == null) {
+            assert ptr != 0;
+
+            try {
+                if (task != null) {
+                    if (task.onJobLock()) {
+                        try {
+                            serialize();
+                        }
+                        finally {
+                            task.onJobUnlock();
+                        }
+                    }
+                    else
+                        throw new IgniteCheckedException("Task already completed: " + task);
+                }
+                else
+                    serialize();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IOException("Failed to serialize interop job.", e);
+            }
+        }
+
+        assert job != null;
+
+        out.writeObject(job);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        job = in.readObject();
+    }
+
+    /**
+     * Internal job serialization routine.
+     *
+     * @throws org.apache.ignite.IgniteCheckedException If failed.
+     */
+    private void serialize() throws IgniteCheckedException {
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformInputStream in = mem.input();
+
+            boolean res = ctx.gateway().computeJobSerialize(ptr, mem.pointer()) == 1;
+
+            in.synchronize();
+
+            PortableRawReaderEx reader = ctx.reader(in);
+
+            if (res)
+                job = reader.readObjectDetached();
+            else
+                throw new IgniteCheckedException(reader.readString());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
new file mode 100644
index 0000000..b96d445
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.compute;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeTaskNoResultCache;
+import org.apache.ignite.internal.IgniteComputeImpl;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.portable.PortableRawReaderEx;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Interop task which requires full execution cycle.
+ */
+@ComputeTaskNoResultCache
+public final class PlatformFullTask extends PlatformAbstractTask {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Initial topology version. */
+    private final long topVer;
+
+    /** Compute instance. */
+    private final IgniteComputeImpl compute;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Platform context.
+     * @param compute Target compute instance.
+     * @param taskPtr Pointer to the task in the native platform.
+     * @param topVer Initial topology version.
+     */
+    public PlatformFullTask(PlatformContext ctx, IgniteComputeImpl compute, long taskPtr, long topVer) {
+        super(ctx, taskPtr);
+
+        this.compute = compute;
+        this.topVer = topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable Object arg) {
+        assert arg == null;
+
+        lock.readLock().lock();
+
+        try {
+            assert !done;
+
+            Collection<ClusterNode> nodes = compute.clusterGroup().nodes();
+
+            PlatformMemoryManager memMgr = ctx.memory();
+
+            try (PlatformMemory outMem = memMgr.allocate()) {
+                PlatformOutputStream out = outMem.output();
+
+                PortableRawWriterEx writer = ctx.writer(out);
+
+                write(writer, nodes, subgrid);
+
+                out.synchronize();
+
+                try (PlatformMemory inMem = memMgr.allocate()) {
+                    PlatformInputStream in = inMem.input();
+
+                    ctx.gateway().computeTaskMap(taskPtr, outMem.pointer(), inMem.pointer());
+
+                    in.synchronize();
+
+                    PortableRawReaderEx reader = ctx.reader(in);
+
+                    return read(reader, nodes);
+                }
+            }
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Write topology information.
+     *
+     * @param writer Writer.
+     * @param nodes Current topology nodes.
+     * @param subgrid Subgrid.
+     */
+    private void write(PortableRawWriterEx writer, Collection<ClusterNode> nodes, List<ClusterNode> subgrid) {
+        GridDiscoveryManager discoMgr = ctx.kernalContext().discovery();
+
+        long curTopVer = discoMgr.topologyVersion();
+
+        if (topVer != curTopVer) {
+            writer.writeBoolean(true);
+
+            writer.writeLong(curTopVer);
+
+            writer.writeInt(nodes.size());
+
+            // Write subgrid size for more precise collection allocation on native side.
+            writer.writeInt(subgrid.size());
+
+            for (ClusterNode node : nodes) {
+                ctx.writeNode(writer, node);
+                writer.writeBoolean(subgrid.contains(node));
+            }
+        }
+        else
+            writer.writeBoolean(false);
+    }
+
+    /**
+     * Read map result.
+     *
+     * @param reader Reader.
+     * @param nodes Current topology nodes.
+     * @return Map result.
+     */
+    private Map<ComputeJob, ClusterNode> read(PortableRawReaderEx reader, Collection<ClusterNode> nodes) {
+        if (reader.readBoolean()) {
+            if (!reader.readBoolean())
+                return null;
+
+            int size = reader.readInt();
+
+            Map<ComputeJob, ClusterNode> map = U.newHashMap(size);
+
+            for (int i = 0; i < size; i++) {
+                long ptr = reader.readLong();
+
+                Object nativeJob = reader.readBoolean() ? reader.readObjectDetached() : null;
+
+                PlatformJob job = ctx.createJob(this, ptr, nativeJob);
+
+                UUID jobNodeId = reader.readUuid();
+
+                assert jobNodeId != null;
+
+                ClusterNode jobNode = ctx.kernalContext().discovery().node(jobNodeId);
+
+                if (jobNode == null) {
+                    // Special case when node has left the grid at this point.
+                    // We expect task processor to perform necessary failover.
+                    for (ClusterNode node : nodes) {
+                        if (node.id().equals(jobNodeId)) {
+                            jobNode = node;
+
+                            break;
+                        }
+                    }
+
+                    assert jobNode != null;
+                }
+
+                map.put(job, jobNode);
+            }
+
+            return map;
+        }
+        else
+            throw new IgniteException(reader.readString());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrap.java
new file mode 100644
index 0000000..d066296
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrap.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cpp;
+
+import org.apache.ignite.internal.processors.platform.PlatformAbstractBootstrap;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractConfigurationClosure;
+
+/**
+ * Platform .Net bootstrap.
+ */
+public class PlatformCppBootstrap extends PlatformAbstractBootstrap {
+    /** {@inheritDoc} */
+    @Override protected PlatformAbstractConfigurationClosure closure(long envPtr) {
+        return new PlatformCppConfigurationClosure(envPtr);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrapFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrapFactory.java
new file mode 100644
index 0000000..4933713
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cpp/PlatformCppBootstrapFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cpp;
+
+import org.apache.ignite.internal.processors.platform.PlatformBootstrap;
+import org.apache.ignite.internal.processors.platform.PlatformBootstrapFactory;
+
+/**
+ * Platform .Net bootstrap factory.
+ */
+public class PlatformCppBootstrapFactory implements PlatformBootstrapFactory {
+    /** Bootstrap ID. */
+    public static final int ID = 2;
+
+    /** {@inheritDoc} */
+    @Override public int id() {
+        return ID;
+    }
+
+    /** {@inheritDoc} */
+    @Override public PlatformBootstrap create() {
+        return new PlatformCppBootstrap();
+    }
+}
\ No newline at end of file


Mime
View raw message