ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [18/40] ignite git commit: IGNITE-4014 Fixed "Transaction hangs if entry processor failed during serialization". This closes #1148.
Date Mon, 17 Oct 2016 17:59:15 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index a96d6eb..d388584 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -1127,16 +1127,16 @@ public class GridClosureProcessor extends GridProcessorAdapter {
                         if (closureBytes == null) {
                             closure = c.job;
 
-                            closureBytes = marsh.marshal(c.job);
+                            closureBytes = U.marshal(marsh, c.job);
                         }
 
                         if (c.job == closure)
-                            c.job = marsh.unmarshal(closureBytes, U.resolveClassLoader(ctx.config()));
+                            c.job = U.unmarshal(marsh, closureBytes, U.resolveClassLoader(ctx.config()));
                         else
-                            c.job = marsh.unmarshal(marsh.marshal(c.job), U.resolveClassLoader(ctx.config()));
+                            c.job = U.unmarshal(marsh, U.marshal(marsh, c.job), U.resolveClassLoader(ctx.config()));
                     }
                     else
-                        job = marsh.unmarshal(marsh.marshal(job), U.resolveClassLoader(ctx.config()));
+                        job = U.unmarshal(marsh, U.marshal(marsh, job), U.resolveClassLoader(ctx.config()));
                 }
                 else
                     hadLocNode = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index ad7ad4f..f078b1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -293,7 +293,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                 if (msg.data() == null && msg.dataBytes() != null) {
                     try {
-                        msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config())));
+                        msg.data(U.unmarshal(marsh, msg.dataBytes(), U.resolveClassLoader(ctx.config())));
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to process message (ignoring): " + msg, e);
@@ -729,7 +729,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                     if (msg.data() == null && msg.dataBytes() != null) {
                         try {
-                            msg.data(marsh.unmarshal(msg.dataBytes(), U.resolveClassLoader(ctx.config())));
+                            msg.data(U.unmarshal(marsh, msg.dataBytes(), U.resolveClassLoader(ctx.config())));
                         }
                         catch (IgniteCheckedException e) {
                             U.error(log, "Failed to process message (ignoring): " + msg, e);
@@ -1316,7 +1316,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         if (!msg.messages() &&
             msg.data() != null &&
             (nodes.size() > 1 || !ctx.localNodeId().equals(F.first(nodes).id())))
-            msg.dataBytes(marsh.marshal(msg.data()));
+            msg.dataBytes(U.marshal(marsh, msg.data()));
 
         for (ClusterNode node : nodes) {
             int cnt = 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
index cdfe0e1..2314f29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestData.java
@@ -95,7 +95,7 @@ class StartRequestData implements Externalizable {
     void p2pMarshal(Marshaller marsh) throws IgniteCheckedException {
         assert marsh != null;
 
-        prjPredBytes = marsh.marshal(prjPred);
+        prjPredBytes = U.marshal(marsh, prjPred);
     }
 
     /**
@@ -109,7 +109,7 @@ class StartRequestData implements Externalizable {
         assert prjPred == null;
         assert prjPredBytes != null;
 
-        prjPred = marsh.unmarshal(prjPredBytes, ldr);
+        prjPred = U.unmarshal(marsh, prjPredBytes, ldr);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index c7c1f5e..7663735 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -90,7 +90,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
         if (ctx.config().isDaemon())
             return;
 
-        marshErrBytes = marsh.marshal(new IgniteCheckedException("Failed to marshal response error, " +
+        marshErrBytes = U.marshal(marsh, new IgniteCheckedException("Failed to marshal response error, " +
             "see node log for details."));
 
         flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
@@ -235,7 +235,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
             Object topic;
 
             try {
-                topic = marsh.unmarshal(req.responseTopicBytes(), U.resolveClassLoader(null, ctx.config()));
+                topic = U.unmarshal(marsh, req.responseTopicBytes(), U.resolveClassLoader(null, ctx.config()));
             }
             catch (IgniteCheckedException e) {
                 U.error(log, "Failed to unmarshal topic from request: " + req, e);
@@ -275,7 +275,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
             StreamReceiver<K, V> updater;
 
             try {
-                updater = marsh.unmarshal(req.updaterBytes(), U.resolveClassLoader(clsLdr, ctx.config()));
+                updater = U.unmarshal(marsh, req.updaterBytes(), U.resolveClassLoader(clsLdr, ctx.config()));
 
                 if (updater != null)
                     ctx.resource().injectGeneric(updater);
@@ -329,7 +329,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
         byte[] errBytes;
 
         try {
-            errBytes = err != null ? marsh.marshal(err) : null;
+            errBytes = err != null ? U.marshal(marsh, err) : null;
         }
         catch (Exception e) {
             U.error(log, "Failed to marshal error [err=" + err + ", marshErr=" + e + ']', e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 05e6488..c2f226c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1353,11 +1353,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     if (updaterBytes == null) {
                         assert rcvr != null;
 
-                        updaterBytes = ctx.config().getMarshaller().marshal(rcvr);
+                        updaterBytes = U.marshal(ctx, rcvr);
                     }
 
                     if (topicBytes == null)
-                        topicBytes = ctx.config().getMarshaller().marshal(topic);
+                        topicBytes = U.marshal(ctx, topic);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to marshal (request will not be sent).", e);
@@ -1488,7 +1488,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                 try {
                     GridPeerDeployAware jobPda0 = jobPda;
 
-                    err = ctx.config().getMarshaller().unmarshal(
+                    err = U.unmarshal(ctx,
                         errBytes,
                         U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config()));
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
index bca4592..275e7eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -96,7 +97,7 @@ public class IgfsAckMessage extends IgfsCommunicationMessage {
         super.prepareMarshal(marsh);
 
         if (err != null && errBytes == null)
-            errBytes = marsh.marshal(err);
+            errBytes = U.marshal(marsh, err);
     }
 
     /** {@inheritDoc} */
@@ -104,7 +105,7 @@ public class IgfsAckMessage extends IgfsCommunicationMessage {
         super.finishUnmarshal(marsh, ldr);
 
         if (errBytes != null && err == null)
-            err = marsh.unmarshal(errBytes, ldr);
+            err = U.unmarshal(marsh, errBytes, ldr);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
index e59b257..3224c20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteMessage.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -95,7 +96,7 @@ public class IgfsDeleteMessage extends IgfsCommunicationMessage {
         super.prepareMarshal(marsh);
 
         if (err != null)
-            errBytes = marsh.marshal(err);
+            errBytes = U.marshal(marsh, err);
     }
 
     /** {@inheritDoc} */
@@ -103,7 +104,7 @@ public class IgfsDeleteMessage extends IgfsCommunicationMessage {
         super.finishUnmarshal(marsh, ldr);
 
         if (errBytes != null)
-            err = marsh.unmarshal(errBytes, ldr);
+            err = U.unmarshal(marsh, errBytes, ldr);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
index 72b5a19..9752411 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPaths.java
@@ -74,7 +74,7 @@ public class IgfsPaths implements Externalizable {
         else {
             ByteArrayOutputStream out = new ByteArrayOutputStream();
 
-            new JdkMarshaller().marshal(payload, out);
+            U.marshal(new JdkMarshaller(), payload, out);
 
             payloadBytes = out.toByteArray();
         }
@@ -105,7 +105,7 @@ public class IgfsPaths implements Externalizable {
         else {
             ByteArrayInputStream in = new ByteArrayInputStream(payloadBytes);
 
-            return new JdkMarshaller().unmarshal(in, clsLdr);
+            return U.unmarshal(new JdkMarshaller(), in, clsLdr);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 563a3d8..2f04402 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -407,7 +407,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
         boolean loc = ctx.localNodeId().equals(taskNode.id()) && !ctx.config().isMarshalLocalJobs();
 
         GridTaskSessionRequest req = new GridTaskSessionRequest(ses.getId(), ses.getJobId(),
-            loc ? null : marsh.marshal(attrs), attrs);
+            loc ? null : U.marshal(marsh, attrs), attrs);
 
         Object topic = TOPIC_TASK.topic(ses.getJobId(), ctx.discovery().localNode().id());
 
@@ -521,7 +521,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
             ctx.io().send(taskNode, TOPIC_JOB_SIBLINGS,
                 new GridJobSiblingsRequest(ses.getId(),
                     loc ? topic : null,
-                    loc ? null : marsh.marshal(topic)),
+                    loc ? null : U.marshal(marsh, topic)),
                 SYSTEM_POOL);
 
             // 4. Listen to discovery events.
@@ -1028,7 +1028,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
                             if (siblings0 == null) {
                                 assert req.getSiblingsBytes() != null;
 
-                                siblings0 = marsh.unmarshal(req.getSiblingsBytes(), U.resolveClassLoader(ctx.config()));
+                                siblings0 = U.unmarshal(marsh, req.getSiblingsBytes(), U.resolveClassLoader(ctx.config()));
                             }
 
                             siblings = new ArrayList<>(siblings0);
@@ -1040,7 +1040,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
                             sesAttrs = req.getSessionAttributes();
 
                             if (sesAttrs == null)
-                                sesAttrs = marsh.unmarshal(req.getSessionAttributesBytes(),
+                                sesAttrs = U.unmarshal(marsh, req.getSessionAttributesBytes(),
                                     U.resolveClassLoader(dep.classLoader(), ctx.config()));
                         }
 
@@ -1068,7 +1068,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
                         Map<? extends Serializable, ? extends Serializable> jobAttrs = req.getJobAttributes();
 
                         if (jobAttrs == null)
-                            jobAttrs = marsh.unmarshal(req.getJobAttributesBytes(),
+                            jobAttrs = U.unmarshal(marsh, req.getJobAttributesBytes(),
                                 U.resolveClassLoader(dep.classLoader(), ctx.config()));
 
                         jobCtx = new GridJobContextImpl(ctx, req.getJobId(), jobAttrs);
@@ -1343,11 +1343,11 @@ public class GridJobProcessor extends GridProcessorAdapter {
                 locNodeId,
                 req.getSessionId(),
                 req.getJobId(),
-                loc ? null : marsh.marshal(ex),
+                loc ? null : U.marshal(marsh, ex),
                 ex,
-                loc ? null : marsh.marshal(null),
+                loc ? null : U.marshal(marsh, null),
                 null,
-                loc ? null : marsh.marshal(null),
+                loc ? null : U.marshal(marsh, null),
                 null,
                 false,
                 null);
@@ -1439,7 +1439,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
             boolean loc = ctx.localNodeId().equals(nodeId) && !ctx.config().isMarshalLocalJobs();
 
             Map<?, ?> attrs = loc ? req.getAttributes() :
-                (Map<?, ?>)marsh.unmarshal(req.getAttributesBytes(),
+                (Map<?, ?>)U.unmarshal(marsh, req.getAttributesBytes(),
                     U.resolveClassLoader(ses.getClassLoader(), ctx.config()));
 
             if (ctx.event().isRecordable(EVT_TASK_SESSION_ATTR_SET)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 16fadaf..8169eb1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -421,7 +421,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
 
         try {
             if (job == null) {
-                job = marsh.unmarshal(jobBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
+                job = U.unmarshal(marsh, jobBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
 
                 // No need to hold reference any more.
                 jobBytes = null;
@@ -804,11 +804,11 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
                                 ctx.localNodeId(),
                                 ses.getId(),
                                 ses.getJobId(),
-                                loc ? null : marsh.marshal(ex),
+                                loc ? null : U.marshal(marsh, ex),
                                 loc ? ex : null,
-                                loc ? null: marsh.marshal(res),
+                                loc ? null: U.marshal(marsh, res),
                                 loc ? res : null,
-                                loc ? null : marsh.marshal(attrs),
+                                loc ? null : U.marshal(marsh, attrs),
                                 loc ? attrs : null,
                                 isCancelled(),
                                 retry ? ctx.cache().context().exchange().readyAffinityVersion() : null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
index d9d4421..b91e9ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java
@@ -108,7 +108,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
     private byte[] keyBytes(KeyCacheObject key, @Nullable byte[] keyBytes) throws IgniteCheckedException {
         assert key != null;
 
-        return keyBytes != null ? keyBytes : marsh.marshal(key);
+        return keyBytes != null ? keyBytes : U.marshal(marsh, key);
     }
 
     /**
@@ -212,7 +212,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter {
         if (valBytes == null)
             return null;
 
-        return marsh.unmarshal(valBytes, U.resolveClassLoader(ldr, ctx.config()));
+        return U.unmarshal(marsh, valBytes, U.resolveClassLoader(ldr, ctx.config()));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
index 2749e96..947435c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java
@@ -135,14 +135,13 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
                             res.error(err.getMessage());
                         else {
                             res.result(desc.result());
-                            res.resultBytes(ctx.config().getMarshaller().marshal(desc.result()));
+                            res.resultBytes(U.marshal(ctx, desc.result()));
                         }
                     }
                     else
                         res.found(false);
 
-                    Object topic = ctx.config().getMarshaller().unmarshal(req.topicBytes(),
-                        U.resolveClassLoader(ctx.config()));
+                    Object topic = U.unmarshal(ctx, req.topicBytes(), U.resolveClassLoader(ctx.config()));
 
                     ctx.io().send(nodeId, topic, res, SYSTEM_POOL);
                 }
@@ -440,8 +439,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
                     res = (GridTaskResultResponse)msg;
 
                 try {
-                    res.result(ctx.config().getMarshaller().unmarshal(res.resultBytes(),
-                        U.resolveClassLoader(ctx.config())));
+                    res.result(U.unmarshal(ctx, res.resultBytes(), U.resolveClassLoader(ctx.config())));
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to unmarshal task result: " + res, e);
@@ -494,7 +492,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter {
 
             // 2. Send message.
             try {
-                byte[] topicBytes = ctx.config().getMarshaller().marshal(topic);
+                byte[] topicBytes = U.marshal(ctx, topic);
 
                 ctx.io().send(taskNode, TOPIC_REST, new GridTaskResultRequest(taskId, topic, topicBytes), SYSTEM_POOL);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java
index 5beff75..de9851c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestParser.java
@@ -718,7 +718,7 @@ public class GridTcpRestParser implements GridNioParser {
         assert bytes != null;
 
         if ((flags & SERIALIZED_FLAG) != 0)
-            return jdkMarshaller.unmarshal(bytes, null);
+            return U.unmarshal(jdkMarshaller, bytes, null);
 
         int masked = flags & 0xff00;
 
@@ -800,7 +800,7 @@ public class GridTcpRestParser implements GridNioParser {
             flags |= BYTE_ARR_FLAG;
         }
         else {
-            jdkMarshaller.marshal(obj, out);
+            U.marshal(jdkMarshaller, obj, out);
 
             flags |= SERIALIZED_FLAG;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 7b76c48..527d360 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -464,7 +464,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             LazyServiceConfiguration cfg0;
 
             try {
-                byte[] srvcBytes = marsh.marshal(cfg.getService());
+                byte[] srvcBytes = U.marshal(marsh, cfg.getService());
 
                 cfg0 = new LazyServiceConfiguration(cfg, srvcBytes);
             }
@@ -1144,7 +1144,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         if (cfg instanceof LazyServiceConfiguration) {
             byte[] bytes = ((LazyServiceConfiguration)cfg).serviceBytes();
 
-            Service srvc = m.unmarshal(bytes, U.resolveClassLoader(null, ctx.config()));
+            Service srvc = U.unmarshal(m, bytes, U.resolveClassLoader(null, ctx.config()));
 
             ctx.resource().inject(srvc);
 
@@ -1154,10 +1154,9 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             Service svc = cfg.getService();
 
             try {
-                byte[] bytes = m.marshal(svc);
+                byte[] bytes = U.marshal(m, svc);
 
-                Service cp = m.unmarshal(bytes,
-                    U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config()));
+                Service cp = U.unmarshal(m, bytes, U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config()));
 
                 ctx.resource().inject(cp);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 6d97229..d32b51c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -914,7 +914,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
                     GridTaskSessionRequest req = new GridTaskSessionRequest(
                         ses.getId(),
                         null,
-                        loc ? null : marsh.marshal(attrs),
+                        loc ? null : U.marshal(marsh, attrs),
                         attrs);
 
                     // Make sure to go through IO manager always, since order
@@ -1030,7 +1030,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
             boolean loc = ctx.localNodeId().equals(nodeId) && !ctx.config().isMarshalLocalJobs();
 
             Map<?, ?> attrs = loc ? msg.getAttributes() :
-                marsh.<Map<?, ?>>unmarshal(msg.getAttributesBytes(),
+                U.<Map<?, ?>>unmarshal(marsh, msg.getAttributesBytes(),
                     U.resolveClassLoader(task.getTask().getClass().getClassLoader(), ctx.config()));
 
             GridTaskSessionImpl ses = task.getSession();
@@ -1306,7 +1306,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
                     if (topic == null) {
                         assert req.topicBytes() != null;
 
-                        topic = marsh.unmarshal(req.topicBytes(), U.resolveClassLoader(ctx.config()));
+                        topic = U.unmarshal(marsh, req.topicBytes(), U.resolveClassLoader(ctx.config()));
                     }
 
                     boolean loc = ctx.localNodeId().equals(nodeId);
@@ -1314,7 +1314,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
                     ctx.io().send(nodeId, topic,
                         new GridJobSiblingsResponse(
                             loc ? siblings : null,
-                            loc ? null : marsh.marshal(siblings)),
+                            loc ? null : U.marshal(marsh, siblings)),
                         SYSTEM_POOL);
                 }
                 catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 452e48c..0be69d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -805,15 +805,15 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                     try {
                         boolean loc = ctx.localNodeId().equals(res.getNodeId()) && !ctx.config().isMarshalLocalJobs();
 
-                        Object res0 = loc ? res.getJobResult() : marsh.unmarshal(res.getJobResultBytes(),
+                        Object res0 = loc ? res.getJobResult() : U.unmarshal(marsh, res.getJobResultBytes(),
                             U.resolveClassLoader(clsLdr, ctx.config()));
 
                         IgniteException ex = loc ? res.getException() :
-                            marsh.<IgniteException>unmarshal(res.getExceptionBytes(),
+                            U.<IgniteException>unmarshal(marsh, res.getExceptionBytes(),
                                 U.resolveClassLoader(clsLdr, ctx.config()));
 
                         Map<Object, Object> attrs = loc ? res.getJobAttributes() :
-                            marsh.<Map<Object, Object>>unmarshal(res.getJobAttributesBytes(),
+                            U.<Map<Object, Object>>unmarshal(marsh, res.getJobAttributesBytes(),
                                 U.resolveClassLoader(clsLdr, ctx.config()));
 
                         jobRes.onResponse(res0, ex, attrs, res.isCancelled());
@@ -1347,16 +1347,16 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         ses.getTaskName(),
                         ses.getUserVersion(),
                         ses.getTaskClassName(),
-                        loc ? null : marsh.marshal(res.getJob()),
+                        loc ? null : U.marshal(marsh, res.getJob()),
                         loc ? res.getJob() : null,
                         ses.getStartTime(),
                         timeout,
                         ses.getTopology(),
-                        loc ? null : marsh.marshal(ses.getJobSiblings()),
+                        loc ? null : U.marshal(marsh, ses.getJobSiblings()),
                         loc ? ses.getJobSiblings() : null,
-                        loc ? null : marsh.marshal(sesAttrs),
+                        loc ? null : U.marshal(marsh, sesAttrs),
                         loc ? sesAttrs : null,
-                        loc ? null : marsh.marshal(jobAttrs),
+                        loc ? null : U.marshal(marsh, jobAttrs),
                         loc ? jobAttrs : null,
                         ses.getCheckpointSpi(),
                         dep.classLoaderId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index f07266b..1e8d648 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -182,6 +182,7 @@ import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.mxbean.IgniteStandardMXBean;
 import org.apache.ignite.internal.processors.cache.GridCacheAttributes;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
@@ -213,6 +214,7 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.lifecycle.LifecycleAware;
+import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.PluginProvider;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -9634,6 +9636,203 @@ public abstract class IgniteUtils {
         return t0.compareTo(t1) > 0 ? t0 : t1;
     }
 
+
+    /**
+     * Unmarshals object from the input stream using given class loader.
+     * This method should not close given input stream.
+     * <p/>
+     * This method wraps marshaller invocations and guaranty throws {@link IgniteCheckedException} in fail case.
+     *
+     * @param <T> Type of unmarshalled object.
+     * @param in Input stream.
+     * @param clsLdr Class loader to use.
+     * @return Unmarshalled object.
+     * @throws IgniteCheckedException If unmarshalling failed.
+     */
+    public static <T> T unmarshal(Marshaller marsh, InputStream in, @Nullable ClassLoader clsLdr)
+        throws IgniteCheckedException {
+        assert marsh != null;
+        assert in != null;
+
+        try {
+            return marsh.unmarshal(in, clsLdr);
+        }
+        catch (IgniteCheckedException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Unmarshals object from the input stream using given class loader.
+     * This method should not close given input stream.
+     * <p/>
+     * This method wraps marshaller invocations and guaranty throws {@link IgniteCheckedException} in fail case.
+     *
+     * @param <T> Type of unmarshalled object.
+     * @param marsh Marshaller.
+     * @param arr Byte array.
+     * @param clsLdr Class loader to use.
+     * @return Unmarshalled object.
+     * @throws IgniteCheckedException If unmarshalling failed.
+     */
+    public static <T> T unmarshal(Marshaller marsh, byte[] arr, @Nullable ClassLoader clsLdr)
+        throws IgniteCheckedException {
+        assert marsh != null;
+        assert arr != null;
+
+        try {
+            return marsh.unmarshal(arr, clsLdr);
+        }
+        catch (IgniteCheckedException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Unmarshals object from the input stream using given class loader.
+     * This method should not close given input stream.
+     * <p/>
+     * This method wraps marshaller invocations and guaranty throws {@link IgniteCheckedException} in fail case.
+     *
+     * @param <T> Type of unmarshalled object.
+     * @param ctx Kernal contex.
+     * @param arr Byte array.
+     * @param clsLdr Class loader to use.
+     * @return Unmarshalled object.
+     * @throws IgniteCheckedException If unmarshalling failed.
+     */
+    public static <T> T unmarshal(GridKernalContext ctx, byte[] arr, @Nullable ClassLoader clsLdr)
+        throws IgniteCheckedException {
+        assert ctx != null;
+        assert arr != null;
+
+        try {
+            return U.unmarshal(ctx.config().getMarshaller(), arr, clsLdr);
+        }
+        catch (IgniteCheckedException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Unmarshals object from the input stream using given class loader.
+     * This method should not close given input stream.
+     * <p/>
+     * This method wraps marshaller invocations and guaranty throws {@link IgniteCheckedException} in fail case.
+     *
+     * @param <T> Type of unmarshalled object.
+     * @param ctx Kernal contex.
+     * @param arr Byte array.
+     * @param clsLdr Class loader to use.
+     * @return Unmarshalled object.
+     * @throws IgniteCheckedException If unmarshalling failed.
+     */
+    public static <T> T unmarshal(GridCacheSharedContext ctx, byte[] arr, @Nullable ClassLoader clsLdr)
+        throws IgniteCheckedException {
+        assert ctx != null;
+        assert arr != null;
+
+        try {
+            return U.unmarshal(ctx.marshaller(), arr, clsLdr);
+        }
+        catch (IgniteCheckedException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Marshals object to byte array.
+     * <p/>
+     * This method wraps marshaller invocations and guaranty throws {@link IgniteCheckedException} in fail case.
+     *
+     * @param marsh Marshaller.
+     * @param obj Object to marshal.
+     * @return Byte array.
+     * @throws IgniteCheckedException If marshalling failed.
+     */
+    public static byte[] marshal(Marshaller marsh, Object obj) throws IgniteCheckedException {
+        assert marsh != null;
+
+        try {
+            return marsh.marshal(obj);
+        }
+        catch (IgniteCheckedException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Marshals object to byte array.
+     * <p/>
+     * This method wraps marshaller invocations and guaranty throws {@link IgniteCheckedException} in fail case.
+     *
+     * @param marsh Marshaller.
+     * @param obj Object to marshal.
+     * @param out Output stream.
+     * @throws IgniteCheckedException If marshalling failed.
+     */
+    public static void marshal(Marshaller marsh, @Nullable Object obj, OutputStream out)
+        throws IgniteCheckedException {
+        assert marsh != null;
+
+        try {
+            marsh.marshal(obj, out);
+        }
+        catch (IgniteCheckedException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Marshals object to byte array. Wrap marshaller
+     * <p/>
+     * This method wraps marshaller invocations and guaranty throws {@link IgniteCheckedException} in fail case.
+     *
+     * @param ctx Kernal context.
+     * @param obj Object to marshal.
+     * @return Byte array.
+     * @throws IgniteCheckedException If marshalling failed.
+     */
+    public static byte[] marshal(GridKernalContext ctx, Object obj) throws IgniteCheckedException {
+        assert ctx != null;
+
+        return marshal(ctx.config().getMarshaller(), obj);
+    }
+
+    /**
+     * Marshals object to byte array. Wrap marshaller
+     * <p/>
+     * This method wraps marshaller invocations and guaranty throws {@link IgniteCheckedException} in fail case.
+     *
+     * @param ctx Cache context.
+     * @param obj Object to marshal.
+     * @return Byte array.
+     * @throws IgniteCheckedException If marshalling failed.
+     */
+    public static byte[] marshal(GridCacheSharedContext ctx, Object obj) throws IgniteCheckedException {
+        assert ctx != null;
+
+        return marshal(ctx.marshaller(), obj);
+    }
+
     /**
      * Get current Ignite name.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java
index 7a130d3..54172dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/jdk/JdkMarshaller.java
@@ -80,7 +80,7 @@ public class JdkMarshaller extends AbstractNodeNameAwareMarshaller {
 
             objOut.flush();
         }
-        catch (IOException e) {
+        catch (Exception e) {
             throw new IgniteCheckedException("Failed to serialize object: " + obj, e);
         }
         finally{
@@ -119,14 +119,14 @@ public class JdkMarshaller extends AbstractNodeNameAwareMarshaller {
 
             return (T)objIn.readObject();
         }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to deserialize object with given class loader: " + clsLdr, e);
-        }
         catch (ClassNotFoundException e) {
             throw new IgniteCheckedException("Failed to find class with given class loader for unmarshalling " +
                 "(make sure same versions of all classes are available on all nodes or enable peer-class-loading): " +
                 clsLdr, e);
         }
+        catch (Exception e) {
+            throw new IgniteCheckedException("Failed to deserialize object with given class loader: " + clsLdr, e);
+        }
         finally{
             U.closeQuiet(objIn);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java
index 37f7acb..467dddf 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshaller.java
@@ -193,7 +193,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
 
             return objOut.out().array();
         }
-        catch (IOException e) {
+        catch (Exception e) {
             throw new IgniteCheckedException("Failed to serialize object: " + obj, e);
         }
         finally {
@@ -217,14 +217,14 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
 
             return (T)objIn.readObject();
         }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to deserialize object with given class loader: " + clsLdr, e);
-        }
         catch (ClassNotFoundException e) {
             throw new IgniteCheckedException("Failed to find class with given class loader for unmarshalling " +
                 "(make sure same versions of all classes are available on all nodes or enable peer-class-loading): " +
                 clsLdr, e);
         }
+        catch (Exception e) {
+            throw new IgniteCheckedException("Failed to deserialize object with given class loader: " + clsLdr, e);
+        }
         finally {
             OptimizedObjectStreamRegistry.closeIn(objIn);
         }
@@ -246,14 +246,14 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
 
             return (T)objIn.readObject();
         }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to deserialize object with given class loader: " + clsLdr, e);
-        }
         catch (ClassNotFoundException e) {
             throw new IgniteCheckedException("Failed to find class with given class loader for unmarshalling " +
                 "(make sure same version of all classes are available on all nodes or enable peer-class-loading): " +
                 clsLdr, e);
         }
+        catch (Exception e) {
+            throw new IgniteCheckedException("Failed to deserialize object with given class loader: " + clsLdr, e);
+        }
         finally {
             OptimizedObjectStreamRegistry.closeIn(objIn);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsUtils.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsUtils.java
index b1698fa..f8c4130 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/sharedfs/SharedFsUtils.java
@@ -61,7 +61,7 @@ final class SharedFsUtils {
         InputStream in = new FileInputStream(file);
 
         try {
-            return (SharedFsCheckpointData)m.unmarshal(in, U.gridClassLoader());
+            return U.unmarshal(m, in, U.gridClassLoader());
         }
         finally {
             U.close(in, log);
@@ -91,7 +91,7 @@ final class SharedFsUtils {
         try {
             out = new FileOutputStream(file);
 
-            m.marshal(data, out);
+            U.marshal(m, data, out);
         }
         finally {
             U.close(out, log);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 2c85645..2d948da 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -431,7 +431,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         try {
             sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
-                spi.marshaller().marshal(evt)));
+                U.marshal(spi.marshaller(), evt)));
         }
         catch (IgniteCheckedException e) {
             throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -701,7 +701,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             Map<String, Object> attrs = new HashMap<>(node.getAttributes());
 
             attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
-                spi.marshaller().marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
+                U.marshal(spi.marshaller(), attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
 
             node.setAttributes(attrs);
         }
@@ -902,7 +902,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         TcpDiscoveryAbstractMessage msg;
 
                         try {
-                            msg = spi.marshaller().unmarshal(in, U.resolveClassLoader(spi.ignite().configuration()));
+                            msg = U.unmarshal(spi.marshaller(), in, U.resolveClassLoader(spi.ignite().configuration()));
                         }
                         catch (IgniteCheckedException e) {
                             if (log.isDebugEnabled())
@@ -1232,7 +1232,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         List<TcpDiscoveryAbstractMessage> msgs = null;
 
                         while (!isInterrupted()) {
-                            TcpDiscoveryAbstractMessage msg = spi.marshaller().unmarshal(in,
+                            TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in,
                                 U.resolveClassLoader(spi.ignite().configuration()));
 
                             if (msg instanceof TcpDiscoveryClientReconnectMessage) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 135a737..78a5f39 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -744,7 +744,7 @@ class ServerImpl extends TcpDiscoveryImpl {
     @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
         try {
             msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt,
-                spi.marshaller().marshal(evt)));
+                U.marshal(spi.marshaller(), evt)));
         }
         catch (IgniteCheckedException e) {
             throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -827,7 +827,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         Map<String, Object> attrs = new HashMap<>(locNode.attributes());
 
-                        attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, spi.marshaller().marshal(subj));
+                        attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, U.marshal(spi.marshaller(), subj));
                         attrs.remove(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS);
 
                         locNode.setAttributes(attrs);
@@ -1243,7 +1243,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             Map<String, Object> attrs = new HashMap<>(node.getAttributes());
 
             attrs.put(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS,
-                spi.marshaller().marshal(attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
+                U.marshal(spi.marshaller(), attrs.get(IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS)));
 
             node.setAttributes(attrs);
         }
@@ -1266,7 +1266,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (credBytes == null)
                 return null;
 
-            return spi.marshaller().unmarshal(credBytes, null);
+            return U.unmarshal(spi.marshaller(), credBytes, null);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteSpiException("Failed to unmarshal node security credentials: " + node.id(), e);
@@ -2380,7 +2380,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
                     if (msgBytes == null) {
                         try {
-                            msgBytes = spi.marshaller().marshal(msg);
+                            msgBytes = U.marshal(spi.marshaller(), msg);
                         }
                         catch (IgniteCheckedException e) {
                             U.error(log, "Failed to marshal message: " + msg, e);
@@ -2399,7 +2399,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         if (clientMsgWorker.clientNodeId.equals(node.id())) {
                             try {
-                                msg0 = spi.marshaller().unmarshal(msgBytes,
+                                msg0 = U.unmarshal(spi.marshaller(), msgBytes,
                                     U.resolveClassLoader(spi.ignite().configuration()));
 
                                 prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null, null, null);
@@ -3157,7 +3157,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             // Stick in authentication subject to node (use security-safe attributes for copy).
                             Map<String, Object> attrs = new HashMap<>(node.getAttributes());
 
-                            attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, spi.marshaller().marshal(subj));
+                            attrs.put(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT, U.marshal(spi.marshaller(), subj));
 
                             node.setAttributes(attrs);
                         }
@@ -3805,7 +3805,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         else {
                             SecurityContext subj = spi.nodeAuth.authenticateNode(node, cred);
 
-                            SecurityContext coordSubj = spi.marshaller().unmarshal(
+                            SecurityContext coordSubj = U.unmarshal(spi.marshaller(),
                                 node.<byte[]>attribute(IgniteNodeAttributes.ATTR_SECURITY_SUBJECT),
                                 U.resolveClassLoader(spi.ignite().configuration()));
 
@@ -4872,7 +4872,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (nextMsg != null) {
                             try {
                                 TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage(
-                                    getLocalNodeId(), nextMsg, spi.marshaller().marshal(nextMsg));
+                                    getLocalNodeId(), nextMsg, U.marshal(spi.marshaller(), nextMsg));
 
                                 ackMsg.topologyVersion(msg.topologyVersion());
 
@@ -5017,7 +5017,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             msgObj);
 
                         if (msgObj.isMutable())
-                            msg.message(msgObj, spi.marshaller().marshal(msgObj));
+                            msg.message(msgObj, U.marshal(spi.marshaller(), msgObj));
                     }
                     catch (Throwable e) {
                         U.error(log, "Failed to unmarshal discovery custom message.", e);
@@ -5455,7 +5455,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 while (!isInterrupted()) {
                     try {
-                        TcpDiscoveryAbstractMessage msg = spi.marshaller().unmarshal(in,
+                        TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in,
                             U.resolveClassLoader(spi.ignite().configuration()));
 
                         msg.senderNodeId(nodeId);
@@ -5946,7 +5946,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 byte[] msgBytes = msgT.get2();
 
                 if (msgBytes == null)
-                    msgBytes = spi.marshaller().marshal(msg);
+                    msgBytes = U.marshal(spi.marshaller(), msg);
 
                 if (msg instanceof TcpDiscoveryClientAckResponse) {
                     if (clientVer == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 426eb8e..a8704e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1380,7 +1380,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         IgniteCheckedException err = null;
 
         try {
-            marshaller().marshal(msg, out);
+            U.marshal(marshaller(), msg, out);
         }
         catch (IgniteCheckedException e) {
             err = e;
@@ -1464,7 +1464,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
         try {
             sock.setSoTimeout((int)timeout);
 
-            T res = marshaller().unmarshal(in == null ? sock.getInputStream() : in,
+            T res = U.unmarshal(marshaller(), in == null ? sock.getInputStream() : in,
                 U.resolveClassLoader(ignite.configuration()));
 
             return res;
@@ -1682,7 +1682,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
         for (Map.Entry<Integer, Serializable> entry : data.entrySet()) {
             try {
-                byte[] bytes = marshaller().marshal(entry.getValue());
+                byte[] bytes = U.marshal(marshaller(), entry.getValue());
 
                 data0.put(entry.getKey(), bytes);
             }
@@ -1713,7 +1713,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
         for (Map.Entry<Integer, byte[]> entry : data.entrySet()) {
             try {
-                Serializable compData = marshaller().unmarshal(entry.getValue(), clsLdr);
+                Serializable compData = U.unmarshal(marshaller(), entry.getValue(), clsLdr);
 
                 data0.put(entry.getKey(), compData);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
index 5bbe90e..e96abe9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
@@ -690,7 +690,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
         private AddressResponse(Collection<InetSocketAddress> addrs) throws IgniteCheckedException {
             this.addrs = addrs;
 
-            byte[] addrsData = marsh.marshal(addrs);
+            byte[] addrsData = U.marshal(marsh, addrs);
             data = new byte[U.IGNITE_HEADER.length + addrsData.length];
 
             if (data.length > MAX_DATA_LENGTH)
@@ -709,7 +709,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
 
             this.data = data;
 
-            addrs = marsh.unmarshal(Arrays.copyOfRange(data, U.IGNITE_HEADER.length, data.length), null);
+            addrs = U.unmarshal(marsh, Arrays.copyOfRange(data, U.IGNITE_HEADER.length, data.length), null);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
index c0e39d3..219c0ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp.messages;
 import java.util.UUID;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.jetbrains.annotations.NotNull;
@@ -77,7 +78,7 @@ public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage
      */
     @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr) throws Throwable {
         if (msg == null) {
-            msg = marsh.unmarshal(msgBytes, ldr);
+            msg = U.unmarshal(marsh, msgBytes, ldr);
 
             assert msg != null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index 222f463..f0ac7bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -596,7 +596,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
 
         if (keyBytes == null) {
             try {
-                keyBytes = ignite.configuration().getMarshaller().marshal(key.key());
+                keyBytes = U.marshal(ignite.configuration().getMarshaller(), key.key());
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteSpiException("Failed to marshal key: " + key.key(), e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index a5b89e4..d1c8d19 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.stream.StreamAdapter;
@@ -231,7 +232,7 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
         /** {@inheritDoc} */
         @Override public T convert(byte[] msg) {
             try {
-                return marsh.unmarshal(msg, null);
+                return U.unmarshal(marsh, msg, null);
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheEntryProcessorExternalizableFailedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheEntryProcessorExternalizableFailedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheEntryProcessorExternalizableFailedTest.java
new file mode 100644
index 0000000..031774e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheEntryProcessorExternalizableFailedTest.java
@@ -0,0 +1,588 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheEntryProcessorExternalizableFailedTest extends GridCommonAbstractTest {
+    /** */
+    private static final int EXPECTED_VALUE = 42;
+
+    /** */
+    private static final int WRONG_VALUE = -1;
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 3;
+
+    /** */
+    public static final int ITERATION_CNT = 1;
+
+    /** */
+    public static final int KEYS = 10;
+
+    /** */
+    private boolean client;
+
+    /** */
+    private boolean failOnWrite = false;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(getServerNodeCount());
+
+        client = true;
+
+        startGrid(getServerNodeCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        failOnWrite = false;
+    }
+
+    /**
+     * @return Server nodes.
+     */
+    private int getServerNodeCount() {
+        return NODES;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticFullSync() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 2);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+        failOnWrite = true;
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticOnePhaseCommit() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 1);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+        failOnWrite = true;
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticOnePhaseCommitWithNearCache() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 1)
+            .setNearConfiguration(new NearCacheConfiguration());
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+        failOnWrite = true;
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticOnePhaseCommitFullSync() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 1);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+        failOnWrite = true;
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticOnePhaseCommitFullSyncWithNearCache() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 1)
+            .setNearConfiguration(new NearCacheConfiguration());
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+        failOnWrite = true;
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimistic() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 2);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+        failOnWrite = true;
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticWithNearCache() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 2)
+            .setNearConfiguration(new NearCacheConfiguration());
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+        failOnWrite = true;
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticFullSync() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 2);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+
+        failOnWrite = true;
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticOnePhaseCommit() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 1);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+        failOnWrite = true;
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticOnePhaseCommitFullSync() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 1);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+        failOnWrite = true;
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticOnePhaseCommitFullSyncWithNearCache() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 1)
+            .setNearConfiguration(new NearCacheConfiguration());
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+        failOnWrite = true;
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimistic() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(PRIMARY_SYNC, 2);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+        failOnWrite = true;
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticFullSyncWithNearCache() throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(FULL_SYNC, 2);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, OPTIMISTIC, REPEATABLE_READ);
+
+        failOnWrite = true;
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, SERIALIZABLE);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, READ_COMMITTED);
+
+        doTestInvokeTest(ccfg, PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void doTestInvokeTest(CacheConfiguration ccfg, TransactionConcurrency txConcurrency,
+        TransactionIsolation txIsolation) throws Exception {
+        IgniteEx cln = grid(getServerNodeCount());
+
+        grid(0).createCache(ccfg);
+
+        IgniteCache clnCache;
+
+        if (ccfg.getNearConfiguration() != null)
+            clnCache = cln.createNearCache(ccfg.getName(), ccfg.getNearConfiguration());
+        else
+            clnCache = cln.cache(ccfg.getName());
+
+        putKeys(clnCache, EXPECTED_VALUE);
+
+        try {
+            // Explicit tx.
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                try (final Transaction tx = cln.transactions().txStart(txConcurrency, txIsolation)) {
+                    putKeys(clnCache, WRONG_VALUE);
+
+                    clnCache.invoke(KEYS, createEntryProcessor());
+
+                    GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+                        @Override public Object call() throws Exception {
+                            tx.commit();
+
+                            return null;
+                        }
+                    }, UnsupportedOperationException.class);
+                }
+
+                assertNull(cln.transactions().tx());
+
+                checkKeys(clnCache, EXPECTED_VALUE);
+            }
+
+            // From affinity node.
+            Ignite grid = grid(ThreadLocalRandom.current().nextInt(NODES));
+
+            final IgniteCache cache = grid.cache(ccfg.getName());
+
+            // Explicit tx.
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                try (final Transaction tx = grid.transactions().txStart(txConcurrency, txIsolation)) {
+                    putKeys(cache, WRONG_VALUE);
+
+                    cache.invoke(KEYS, createEntryProcessor());
+
+                    GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+                        @Override public Object call() throws Exception {
+                            tx.commit();
+
+                            return null;
+                        }
+                    }, UnsupportedOperationException.class);
+                }
+
+                assertNull(cln.transactions().tx());
+
+                checkKeys(cache, EXPECTED_VALUE);
+            }
+
+            final IgniteCache clnCache0 = clnCache;
+
+            // Implicit tx.
+            for (int i = 0; i < ITERATION_CNT; i++) {
+                GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        clnCache0.invoke(KEYS, createEntryProcessor());
+
+                        return null;
+                    }
+                }, UnsupportedOperationException.class);
+
+                assertNull(cln.transactions().tx());
+            }
+
+            checkKeys(clnCache, EXPECTED_VALUE);
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+        }
+        finally {
+            grid(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @return Entry processor.
+     */
+    @NotNull private EntryProcessor<Integer, Integer, Integer> createEntryProcessor() {
+        return failOnWrite ? new ExternalizableFailedWriteEntryProcessor() :
+            new ExternalizableFailedReadEntryProcessor();
+    }
+
+    /**
+     * @param cache Cache.
+     * @param val Value.
+     */
+    private void putKeys(IgniteCache cache, int val) {
+        cache.put(KEYS, val);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param expVal Expected value.
+     */
+    private void checkKeys(IgniteCache cache, int expVal) {
+        assertEquals(expVal, cache.get(KEYS));
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(CacheWriteSynchronizationMode wrMode, int backup) {
+        return new CacheConfiguration("test-cache-" + wrMode + "-" + backup)
+            .setAtomicityMode(TRANSACTIONAL)
+            .setWriteSynchronizationMode(FULL_SYNC)
+            .setBackups(backup);
+    }
+
+    /**
+     *
+     */
+    private static class ExternalizableFailedWriteEntryProcessor implements EntryProcessor<Integer, Integer, Integer>,
+        Externalizable{
+        /** */
+        public ExternalizableFailedWriteEntryProcessor() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments)
+            throws EntryProcessorException {
+            entry.setValue(42);
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            // No-op.
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ExternalizableFailedReadEntryProcessor implements EntryProcessor<Integer, Integer, Integer>,
+        Externalizable {
+        /** */
+        public ExternalizableFailedReadEntryProcessor() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments)
+            throws EntryProcessorException {
+            entry.setValue(42);
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            throw new UnsupportedOperationException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 5ad4cb8..388c3a3 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -140,6 +140,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheGet
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearTxExceptionSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxExceptionSelfTest;
 import org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxExceptionSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorExternalizableFailedTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheEntryProcessorNonSerializableTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSelfTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerImplSelfTest;
@@ -191,6 +192,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheAtomicLocalWithStoreInvokeTest.class);
         suite.addTestSuite(IgniteCacheTxInvokeTest.class);
         suite.addTestSuite(CacheEntryProcessorNonSerializableTest.class);
+        suite.addTestSuite(CacheEntryProcessorExternalizableFailedTest.class);
         suite.addTestSuite(IgniteCacheEntryProcessorCallTest.class);
         GridTestUtils.addTestIfNeeded(suite, CacheEntryProcessorCopySelfTest.class, ignoredTests);
         suite.addTestSuite(IgniteCacheTxNearEnabledInvokeTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a6cfce6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
index 3f79469..eeca564 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
 import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
 import org.apache.ignite.internal.util.nio.GridNioFuture;
 import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.Marshaller;
 
 /**
@@ -29,15 +30,15 @@ import org.apache.ignite.marshaller.Marshaller;
  */
 public class HadoopMarshallerFilter extends GridNioFilterAdapter {
     /** Marshaller. */
-    private Marshaller marshaller;
+    private Marshaller marsh;
 
     /**
-     * @param marshaller Marshaller to use.
+     * @param marsh Marshaller to use.
      */
-    public HadoopMarshallerFilter(Marshaller marshaller) {
+    public HadoopMarshallerFilter(Marshaller marsh) {
         super("HadoopMarshallerFilter");
 
-        this.marshaller = marshaller;
+        this.marsh = marsh;
     }
 
     /** {@inheritDoc} */
@@ -59,14 +60,14 @@ public class HadoopMarshallerFilter extends GridNioFilterAdapter {
     @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
         assert msg instanceof HadoopMessage : "Invalid message type: " + msg;
 
-        return proceedSessionWrite(ses, marshaller.marshal(msg));
+        return proceedSessionWrite(ses, U.marshal(marsh, msg));
     }
 
     @Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException {
         assert msg instanceof byte[];
 
         // Always unmarshal with system classloader.
-        proceedMessageReceived(ses, marshaller.unmarshal((byte[])msg, null));
+        proceedMessageReceived(ses, U.unmarshal(marsh, (byte[])msg, null));
     }
 
     /** {@inheritDoc} */


Mime
View raw message