ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/3] incubator-ignite git commit: # ignite-51
Date Sun, 01 Mar 2015 09:08:19 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7db0122/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
index 71e0ea0..e7c401c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
@@ -41,7 +41,6 @@ import org.apache.ignite.lang.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
-import java.io.*;
 import java.util.*;
 import java.util.Map.*;
 import java.util.concurrent.*;
@@ -74,14 +73,6 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
     /** Cache name ({@code null} for default cache). */
     private final String cacheName;
 
-    /** Portable enabled flag. */
-    private final boolean portableEnabled;
-
-    /**
-     *  If {@code true} then data will be transferred in compact format (only keys and values).
-     *  Otherwise full map entry will be transferred (this is requires by DR internal logic).
-     */
-    private final boolean compact;
 
     /** Per-node buffer size. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
@@ -106,6 +97,12 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
     /** Context. */
     private final GridKernalContext ctx;
 
+    /** */
+    private final GridPortableProcessor cacheObjProc;
+
+    /** */
+    private final CacheObjectContext cacheObjCtx;
+
     /** Communication topic for responses. */
     private final Object topic;
 
@@ -160,21 +157,19 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
      * @param ctx Grid kernal context.
      * @param cacheName Cache name.
      * @param flushQ Flush queue.
-     * @param compact If {@code true} data is transferred in compact mode (only keys and
values).
-     *                Otherwise full map entry will be transferred (this is required by DR
internal logic).
      */
     public IgniteDataLoaderImpl(
         final GridKernalContext ctx,
         @Nullable final String cacheName,
-        DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ,
-        boolean compact
+        DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ
     ) {
         assert ctx != null;
 
         this.ctx = ctx;
+        this.cacheObjProc = ctx.portable();
+        this.cacheObjCtx = new CacheObjectContext(ctx);
         this.cacheName = cacheName;
         this.flushQ = flushQ;
-        this.compact = compact;
 
         log = U.logger(ctx, logRef, IgniteDataLoaderImpl.class);
 
@@ -183,8 +178,6 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
         if (node == null)
             throw new IllegalStateException("Cache doesn't exist: " + cacheName);
 
-        portableEnabled = ctx.portable().portableEnabled(node, cacheName);
-
         discoLsnr = new GridLocalEventListener() {
             @Override public void onEvent(Event evt) {
                 assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
@@ -371,6 +364,18 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
     @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K,
V>> entries) {
         A.notEmpty(entries, "entries");
 
+        // TODO IGNITE-51.
+        Collection<? extends IgniteDataLoaderEntry> entries0 = F.viewReadOnly(entries,
new C1<Entry<K, V>, IgniteDataLoaderEntry>() {
+            @Override public IgniteDataLoaderEntry apply(Entry<K, V> e) {
+                KeyCacheObject key = cacheObjProc.toCacheKeyObject(null, e.getKey());
+                CacheObject val = cacheObjProc.toCacheObject(null, e.getValue());
+
+                return new IgniteDataLoaderEntry(key, val);
+            }
+        });
+
+        return addDataInternal(entries0);
+        /*
         enterBusy();
 
         try {
@@ -380,7 +385,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
 
             activeFuts.add(resFut);
 
-            Collection<K> keys = null;
+            Collection<KeyCacheObject> keys = null;
 
             if (entries.size() > 1) {
                 keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()),
1);
@@ -399,6 +404,50 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
         finally {
             leaveBusy();
         }
+        */
+    }
+
+    /**
+     * @param key Key.
+     * @param val Value.
+     */
+    public IgniteFuture<?> addDataInternal(KeyCacheObject key, CacheObject val) {
+        return addDataInternal(Collections.singleton(new IgniteDataLoaderEntry(key, val)));
+    }
+
+    /**
+     * @param entries Entries.
+     * @return Future.
+     */
+    public IgniteFuture<?> addDataInternal(Collection<? extends IgniteDataLoaderEntry>
entries) {
+        enterBusy();
+
+        try {
+            GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx);
+
+            resFut.listenAsync(rmvActiveFut);
+
+            activeFuts.add(resFut);
+
+            Collection<KeyCacheObject> keys = null;
+
+            if (entries.size() > 1) {
+                keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()),
1);
+
+                for (IgniteDataLoaderEntry entry : entries)
+                    keys.add(entry.getKey());
+            }
+
+            load0(entries, resFut, keys, 0);
+
+            return new IgniteFutureImpl<>(resFut);
+        }
+        catch (IgniteException e) {
+            return new IgniteFinishedFutureImpl<>(ctx, e);
+        }
+        finally {
+            leaveBusy();
+        }
     }
 
     /** {@inheritDoc} */
@@ -412,7 +461,10 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
     @Override public IgniteFuture<?> addData(K key, V val) {
         A.notNull(key, "key");
 
-        return addData(new Entry0<>(key, val));
+        KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(null, key);
+        CacheObject val0 = cacheObjProc.toCacheObject(null, val);
+
+        return addDataInternal(Collections.singleton(new IgniteDataLoaderEntry(key0, val0)));
     }
 
     /** {@inheritDoc} */
@@ -427,9 +479,9 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
      * @param remaps Remaps count.
      */
     private void load0(
-        Collection<? extends Map.Entry<K, V>> entries,
+        Collection<? extends IgniteDataLoaderEntry> entries,
         final GridFutureAdapter<Object> resFut,
-        @Nullable final Collection<K> activeKeys,
+        @Nullable final Collection<KeyCacheObject> activeKeys,
         final int remaps
     ) {
         assert entries != null;
@@ -440,20 +492,22 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
             return;
         }
 
-        Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new HashMap<>();
+        Map<ClusterNode, Collection<IgniteDataLoaderEntry>> mappings = new HashMap<>();
 
         boolean initPda = ctx.deploy().enabled() && jobPda == null;
 
-        for (Map.Entry<K, V> entry : entries) {
+        for (IgniteDataLoaderEntry entry : entries) {
             List<ClusterNode> nodes;
 
             try {
-                K key = entry.getKey();
+                KeyCacheObject key = entry.getKey();
 
                 assert key != null;
 
                 if (initPda) {
-                    jobPda = new DataLoaderPda(key, entry.getValue(), updater);
+                    jobPda = new DataLoaderPda(key.value(null, false),
+                        CU.value(entry.getValue(), null, false),
+                        updater);
 
                     initPda = false;
                 }
@@ -475,7 +529,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
             }
 
             for (ClusterNode node : nodes) {
-                Collection<Map.Entry<K, V>> col = mappings.get(node);
+                Collection<IgniteDataLoaderEntry> col = mappings.get(node);
 
                 if (col == null)
                     mappings.put(node, col = new ArrayList<>());
@@ -484,7 +538,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
             }
         }
 
-        for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>>
e : mappings.entrySet()) {
+        for (final Map.Entry<ClusterNode, Collection<IgniteDataLoaderEntry>>
e : mappings.entrySet()) {
             final UUID nodeId = e.getKey().id();
 
             Buffer buf = bufMappings.get(nodeId);
@@ -496,7 +550,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
                     buf = old;
             }
 
-            final Collection<Map.Entry<K, V>> entriesForNode = e.getValue();
+            final Collection<IgniteDataLoaderEntry> entriesForNode = e.getValue();
 
             IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>()
{
                 @Override public void apply(IgniteInternalFuture<?> t) {
@@ -504,7 +558,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
                         t.get();
 
                         if (activeKeys != null) {
-                            for (Map.Entry<K, V> e : entriesForNode)
+                            for (IgniteDataLoaderEntry e : entriesForNode)
                                 activeKeys.remove(e.getKey());
 
                             if (activeKeys.isEmpty())
@@ -559,7 +613,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
      * @return Nodes to send requests to.
      * @throws IgniteCheckedException If failed.
      */
-    private List<ClusterNode> nodes(K key) throws IgniteCheckedException {
+    private List<ClusterNode> nodes(KeyCacheObject key) throws IgniteCheckedException
{
         GridAffinityProcessor aff = ctx.affinity();
 
         return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) :
@@ -793,7 +847,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
         private final Collection<IgniteInternalFuture<Object>> locFuts;
 
         /** Buffered entries. */
-        private List<Map.Entry<K, V>> entries;
+        private List<IgniteDataLoaderEntry> entries;
 
         /** */
         @GridToStringExclude
@@ -846,9 +900,9 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
          * @throws IgniteInterruptedCheckedException If failed.
          * @return Future for operation.
          */
-        @Nullable GridFutureAdapter<?> update(Iterable<Map.Entry<K, V>>
newEntries,
+        @Nullable GridFutureAdapter<?> update(Iterable<IgniteDataLoaderEntry>
newEntries,
             IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException
{
-            List<Map.Entry<K, V>> entries0 = null;
+            List<IgniteDataLoaderEntry> entries0 = null;
             GridFutureAdapter<Object> curFut0;
 
             synchronized (this) {
@@ -856,7 +910,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
 
                 curFut0.listenAsync(lsnr);
 
-                for (Map.Entry<K, V> entry : newEntries)
+                for (IgniteDataLoaderEntry entry : newEntries)
                     entries.add(entry);
 
                 if (entries.size() >= bufSize) {
@@ -881,7 +935,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
         /**
          * @return Fresh collection with some space for outgrowth.
          */
-        private List<Map.Entry<K, V>> newEntries() {
+        private List<IgniteDataLoaderEntry> newEntries() {
             return new ArrayList<>((int)(bufSize * 1.2));
         }
 
@@ -891,7 +945,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
          * @throws IgniteInterruptedCheckedException If thread has been interrupted.
          */
         @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException
{
-            List<Map.Entry<K, V>> entries0 = null;
+            List<IgniteDataLoaderEntry> entries0 = null;
             GridFutureAdapter<Object> curFut0 = null;
 
             synchronized (this) {
@@ -954,7 +1008,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
          * @param curFut Current future.
          * @throws IgniteInterruptedCheckedException If interrupted.
          */
-        private void submit(final Collection<Map.Entry<K, V>> entries, final
GridFutureAdapter<Object> curFut)
+        private void submit(final Collection<IgniteDataLoaderEntry> entries, final
GridFutureAdapter<Object> curFut)
             throws IgniteInterruptedCheckedException {
             assert entries != null;
             assert !entries.isEmpty();
@@ -966,7 +1020,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
 
             if (isLocNode) {
                 fut = ctx.closure().callLocalSafe(
-                    new GridDataLoadUpdateJob<>(ctx, log, cacheName, entries, false,
skipStore, updater), false);
+                    new GridDataLoadUpdateJob(ctx, log, cacheName, entries, false, skipStore,
updater), false);
 
                 locFuts.add(fut);
 
@@ -986,15 +1040,15 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
                 });
             }
             else {
-                byte[] entriesBytes;
-
                 try {
-                    if (compact) {
-                        entriesBytes = ctx.config().getMarshaller()
-                            .marshal(new Entries0<>(entries, portableEnabled ? ctx.portable()
: null));
+                    for (IgniteDataLoaderEntry e : entries) {
+                        e.getKey().prepareMarshal(cacheObjCtx);
+
+                        CacheObject val = e.getValue();
+
+                        if (val != null)
+                            val.prepareMarshal(cacheObjCtx);
                     }
-                    else
-                        entriesBytes = ctx.config().getMarshaller().marshal(entries);
 
                     if (updaterBytes == null) {
                         assert updater != null;
@@ -1048,7 +1102,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
                     topicBytes,
                     cacheName,
                     updaterBytes,
-                    entriesBytes,
+                    entries,
                     true,
                     skipStore,
                     dep != null ? dep.deployMode() : null,
@@ -1246,181 +1300,38 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
     }
 
     /**
-     * Entry.
-     */
-    private static class Entry0<K, V> implements Map.Entry<K, V>, Externalizable
{
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private K key;
-
-        /** */
-        private V val;
-
-        /**
-         * @param key Key.
-         * @param val Value.
-         */
-        private Entry0(K key, @Nullable V val) {
-            assert key != null;
-
-            this.key = key;
-            this.val = val;
-        }
-
-        /**
-         * For {@link Externalizable}.
-         */
-        @SuppressWarnings("UnusedDeclaration")
-        public Entry0() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public K getKey() {
-            return key;
-        }
-
-        /** {@inheritDoc} */
-        @Override public V getValue() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public V setValue(V val) {
-            V old = this.val;
-
-            this.val = val;
-
-            return old;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeObject(key);
-            out.writeObject(val);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
-            key = (K)in.readObject();
-            val = (V)in.readObject();
-        }
-    }
-
-    /**
-     * Wrapper list with special compact serialization of map entries.
-     */
-    private static class Entries0<K, V> extends AbstractCollection<Map.Entry<K,
V>> implements Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**  Wrapped delegate. */
-        private Collection<Map.Entry<K, V>> delegate;
-
-        /** Optional portable processor for converting values. */
-        private GridPortableProcessor portable;
-
-        /**
-         * @param delegate Delegate.
-         * @param portable Portable processor.
-         */
-        private Entries0(Collection<Map.Entry<K, V>> delegate, GridPortableProcessor
portable) {
-            this.delegate = delegate;
-            this.portable = portable;
-        }
-
-        /**
-         * For {@link Externalizable}.
-         */
-        public Entries0() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<Entry<K, V>> iterator() {
-            return delegate.iterator();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int size() {
-            return delegate.size();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeInt(delegate.size());
-
-            boolean portableEnabled = portable != null;
-
-            for (Map.Entry<K, V> entry : delegate) {
-                if (portableEnabled) {
-                    out.writeObject(portable.marshalToPortable(entry.getKey()));
-                    out.writeObject(portable.marshalToPortable(entry.getValue()));
-                }
-                else {
-                    out.writeObject(entry.getKey());
-                    out.writeObject(entry.getValue());
-                }
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
-            int sz = in.readInt();
-
-            delegate = new ArrayList<>(sz);
-
-            for (int i = 0; i < sz; i++) {
-                Object k = in.readObject();
-                Object v = in.readObject();
-
-                delegate.add(new Entry0<>((K)k, (V)v));
-            }
-        }
-    }
-
-    /**
      * Isolated updater which only loads entry initial value.
      */
-    private static class IsolatedUpdater<K, V> implements Updater<K, V> {
+    private static class IsolatedUpdater implements Updater<KeyCacheObject, CacheObject>,
+        GridDataLoadCacheUpdaters.InternalUpdater {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K,
V>> entries) {
-            IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)cache;
+        @Override public void update(IgniteCache<KeyCacheObject, CacheObject> cache,
+            Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) {
+            IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = (IgniteCacheProxy<KeyCacheObject,
CacheObject>)cache;
 
-            GridCacheAdapter<K, V> internalCache = proxy.context().cache();
+            GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = proxy.context().cache();
 
             if (internalCache.isNear())
                 internalCache = internalCache.context().near().dht();
 
-            GridCacheContext<K, V> cctx = internalCache.context();
+            GridCacheContext cctx = internalCache.context();
 
             long topVer = cctx.affinity().affinityTopologyVersion();
 
             GridCacheVersion ver = cctx.versions().next(topVer);
 
-            boolean portable = cctx.portableEnabled();
-
-            for (Map.Entry<K, V> e : entries) {
+            for (Map.Entry<KeyCacheObject, CacheObject> e : entries) {
                 try {
-// TODO IGNITE-51.
-//                    K key = e.getKey();
-//                    V val = e.getValue();
-//
-//                    if (portable) {
-//                        key = (K)cctx.marshalToPortable(key);
-//                        val = (V)cctx.marshalToPortable(val);
-//                    }
+                    e.getKey().finishUnmarshal(cctx, cctx.deploy().globalLoader());
 
-                    GridCacheEntryEx entry = internalCache.entryEx(cctx.toCacheKeyObject(e.getKey()),
topVer);
+                    GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
 
                     entry.unswap(true, false);
 
-                    entry.initialValue(cctx.toCacheObject(e.getValue()),
+                    entry.initialValue(e.getValue(),
                         null,
                         ver,
                         CU.TTL_ETERNAL,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7db0122/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
index b8cfe77..115141d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.dr.*;
 import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.processors.dataload.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
@@ -31,7 +32,8 @@ import java.util.*;
 /**
  * Data center replication cache updater for data loader.
  */
-public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataLoader.Updater<K,
V> {
+public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataLoader.Updater<K,
V>,
+    GridDataLoadCacheUpdaters.InternalUpdater {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7db0122/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
index 0f246cd..880445f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java
@@ -149,16 +149,22 @@ public interface GridPortableProcessor extends GridProcessor {
     public boolean hasField(Object obj, String fieldName);
 
     /**
+     * @param cacheName Cache name.
+     * @return Cache object context.
+     */
+    public CacheObjectContext dataLoadContext(@Nullable String cacheName);
+
+    /**
      * @param obj Object.
      * @return Cache object.
      */
-    @Nullable public CacheObject toCacheObject(GridCacheContext ctx, @Nullable Object obj);
+    @Nullable public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj);
 
     /**
      * @param obj Key value.
      * @return Cache key object.
      */
-    public KeyCacheObject toCacheKeyObject(GridCacheContext ctx, Object obj);
+    public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj);
 
     /**
      * @param obj Value.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7db0122/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
index 14fd763..8738bf4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java
@@ -21,9 +21,9 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.client.marshaller.*;
-import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.portable.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.jetbrains.annotations.*;
 
 import java.nio.*;
@@ -125,12 +125,28 @@ public class GridOsPortableProcessor extends IgniteCacheObjectProcessorAdapter
{
     }
 
     /** {@inheritDoc} */
-    @Nullable public KeyCacheObject toCacheKeyObject(GridCacheContext ctx, Object obj) {
+    @Override public CacheObjectContext dataLoadContext(@Nullable String cacheName) {
+        ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+
+        if (node == null)
+            throw new IllegalStateException("Cache doesn't exist: " + cacheName);
+
+        return new CacheObjectContext(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj)
{
+        if (obj instanceof KeyCacheObject)
+            return (KeyCacheObject)obj;
+
         return new UserKeyCacheObjectImpl(obj);
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public CacheObject toCacheObject(GridCacheContext ctx, @Nullable
Object obj) {
+    @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable
Object obj) {
+        if (obj == null || obj instanceof CacheObject)
+            return (CacheObject)obj;
+
         return new UserCacheObjectImpl(obj);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7db0122/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMapEntry.java
index b9de437..cca1961 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMapEntry.java
@@ -35,8 +35,8 @@ public class GridMapEntry<K, V> implements Map.Entry<K, V>,
Serializable {
     @GridToStringInclude
     private K key;
 
-    @GridToStringInclude
     /** */
+    @GridToStringInclude
     private V val;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7db0122/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
index 583069b..2b836b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
@@ -291,7 +291,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest
{
 
             awaitPartitionMapExchange();
 
-            GridCache<Integer, Integer> cache = ((IgniteKernal)grid(0)).cache(null);
+            IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
 
             for (int i = 0; i < 100; i++)
                 cache.put(i, -1);
@@ -332,7 +332,8 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest
{
                         GridCacheEntryEx entry = cache0.peekEx(key);
 
                         assertNotNull("Missing entry for key: " + key, entry);
-                        assertEquals((Integer)(key < 100 ? -1 : key), entry.rawGetOrUnmarshal(false));
+                        assertEquals((key < 100 ? -1 : key),
+                            CU.value(entry.rawGetOrUnmarshal(false), cache0.context(), false));
                     }
                 }
             }
@@ -871,6 +872,51 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testCustomUserUpdater() throws Exception {
+        useCache = true;
+
+        try {
+            Ignite ignite = startGrid(1);
+
+            startGrid(2);
+            startGrid(3);
+
+            try (IgniteDataLoader<String, TestObject> ldr = ignite.dataLoader(null))
{
+                ldr.allowOverwrite(true);
+
+                ldr.updater(new IgniteDataLoader.Updater<String, TestObject>() {
+                    @Override public void update(IgniteCache<String, TestObject> cache,
+                        Collection<Map.Entry<String, TestObject>> entries) {
+                        for (Map.Entry<String, TestObject> e : entries) {
+                            assertTrue(e.getKey() instanceof String);
+                            assertTrue(e.getValue() instanceof TestObject);
+
+                            cache.put(e.getKey(), new TestObject(e.getValue().val + 1));
+                        }
+                    }
+                });
+
+                for (int i = 0; i < 100; i++)
+                    ldr.addData(String.valueOf(i), new TestObject(i));
+            }
+
+            IgniteCache<String, TestObject> cache = ignite.jcache(null);
+
+            for (int i = 0; i < 100; i++) {
+                TestObject val = cache.get(String.valueOf(i));
+
+                assertNotNull(val);
+                assertEquals(i + 1, val.val);
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
      *
      */
     private static class TestObject {


Mime
View raw message