ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [63/92] [abbrv] incubator-ignite git commit: # ignite-51
Date Fri, 06 Mar 2015 10:14:39 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 342ebd0..f417311 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -770,32 +770,29 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                     GridCacheVersionConflictContext<?, ?> conflictCtx
= null;
 
                                     if (conflictNeedResolve) {
-// TODO IGNITE-51.
-//                                        IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K,
V>>
-//                                            conflictRes = conflictResolve(op, txEntry,
val, valBytes, explicitVer,
-//                                                cached);
-//
-//                                        assert conflictRes != null;
-//
-//                                        conflictCtx = conflictRes.get2();
-//
-//                                        if (conflictCtx.isUseOld())
-//                                            op = NOOP;
-//                                        else if (conflictCtx.isUseNew()) {
-//                                            txEntry.ttl(conflictCtx.ttl());
-//                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
-//                                        }
-//                                        else {
-//                                            assert conflictCtx.isMerge();
-//
-//                                            op = conflictRes.get1();
-//                                            val = conflictCtx.mergeValue();
-//                                            valBytes = null;
-//                                            explicitVer = writeVersion();
-//
-//                                            txEntry.ttl(conflictCtx.ttl());
-//                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
-//                                        }
+                                        IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext>
conflictRes =
+                                            conflictResolve(op, txEntry, val, explicitVer,
cached);
+
+                                        assert conflictRes != null;
+
+                                        conflictCtx = conflictRes.get2();
+
+                                        if (conflictCtx.isUseOld())
+                                            op = NOOP;
+                                        else if (conflictCtx.isUseNew()) {
+                                            txEntry.ttl(conflictCtx.ttl());
+                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
+                                        }
+                                        else {
+                                            assert conflictCtx.isMerge();
+
+                                            op = conflictRes.get1();
+                                            val = txEntry.context().toCacheObject(conflictCtx.mergeValue());
+                                            explicitVer = writeVersion();
+
+                                            txEntry.ttl(conflictCtx.ttl());
+                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
+                                        }
                                     }
                                     else
                                         // Nullify explicit version so that innerSet/innerRemove
will work as usual.
@@ -1886,11 +1883,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public <K, V> IgniteInternalFuture<?> putAllDrAsync(
+    @Override public IgniteInternalFuture<?> putAllDrAsync(
         GridCacheContext cacheCtx,
-        Map<? extends K, GridCacheDrInfo<V>> drMap
+        Map<KeyCacheObject, GridCacheDrInfo> drMap
     ) {
-        return putAllAsync0(cacheCtx,
+        return this.<Object, Object>putAllAsync0(cacheCtx,
             null,
             null,
             null,
@@ -1918,9 +1915,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public <K> IgniteInternalFuture<?> removeAllDrAsync(
+    @Override public IgniteInternalFuture<?> removeAllDrAsync(
         GridCacheContext cacheCtx,
-        Map<? extends K, GridCacheVersion> drMap
+        Map<KeyCacheObject, GridCacheVersion> drMap
     ) {
         return removeAllAsync0(cacheCtx, null, drMap, null, false, null);
     }
@@ -1960,20 +1957,20 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      */
     protected <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite(
         final GridCacheContext cacheCtx,
-        Collection<? extends K> keys,
+        Collection<?> keys,
         @Nullable GridCacheEntryEx cached,
         @Nullable ExpiryPolicy expiryPlc,
         boolean implicit,
-        @Nullable Map<? extends K, ? extends V> lookup,
-        @Nullable Map<? extends K, EntryProcessor<K, V, Object>> invokeMap,
+        @Nullable Map<?, ?> lookup,
+        @Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap,
         @Nullable Object[] invokeArgs,
         boolean retval,
         boolean lockOnly,
         CacheEntryPredicate[] filter,
         final GridCacheReturn<CacheObject> ret,
         Collection<KeyCacheObject> enlisted,
-        @Nullable Map<? extends K, GridCacheDrInfo<V>> drPutMap,
-        @Nullable Map<? extends K, GridCacheVersion> drRmvMap
+        @Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap,
+        @Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap
     ) {
         assert cached == null || keys.size() == 1;
         assert cached == null || F.first(keys).equals(cached.key());
@@ -1998,14 +1995,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
             groupLockSanityCheck(cacheCtx, keys);
 
-            for (K key : keys) {
+            for (Object key : keys) {
                 if (key == null) {
                     setRollbackOnly();
 
                     throw new NullPointerException("Null key.");
                 }
 
-                V val = rmv || lookup == null ? null : lookup.get(key);
+                Object val = rmv || lookup == null ? null : lookup.get(key);
                 EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key);
 
                 GridCacheVersion drVer;
@@ -2013,7 +2010,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 long drExpireTime;
 
                 if (drPutMap != null) {
-                    GridCacheDrInfo<V> info = drPutMap.get(key);
+                    GridCacheDrInfo info = drPutMap.get(key);
 
                     assert info != null;
 
@@ -2510,7 +2507,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         @Nullable Map<? extends K, ? extends V> map,
         @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap,
         @Nullable final Object[] invokeArgs,
-        @Nullable final Map<? extends K, GridCacheDrInfo<V>> drMap,
+        @Nullable final Map<KeyCacheObject, GridCacheDrInfo> drMap,
         final boolean retval,
         @Nullable GridCacheEntryEx cached,
         @Nullable final CacheEntryPredicate[] filter
@@ -2523,14 +2520,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             needReturnValue(true);
 
         // Cached entry may be passed only from entry wrapper.
-        final Map<K, V> map0;
-        final Map<K, EntryProcessor<K, V, Object>> invokeMap0;
+        final Map<?, ?> map0;
+        final Map<?, EntryProcessor<K, V, Object>> invokeMap0;
 
         if (drMap != null) {
             assert map == null;
 
-            map0 = (Map<K, V>)F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo<V>,
V>() {
-                @Override public V apply(GridCacheDrInfo<V> val) {
+            map0 = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>()
{
+                @Override public Object apply(GridCacheDrInfo val) {
                     return val.value();
                 }
             });
@@ -2538,7 +2535,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             invokeMap0 = null;
         }
         else {
-            map0 = (Map<K, V>)map;
+            map0 = map;
             invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
         }
 
@@ -2573,7 +2570,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         }
 
         try {
-            Set<? extends K> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet();
+            Set<?> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet();
 
             Collection<KeyCacheObject> enlisted = new ArrayList<>();
 
@@ -2721,7 +2718,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     private <K, V> IgniteInternalFuture<GridCacheReturn<CacheObject>> removeAllAsync0(
         final GridCacheContext cacheCtx,
         @Nullable final Collection<? extends K> keys,
-        @Nullable Map<? extends  K, GridCacheVersion> drMap,
+        @Nullable Map<KeyCacheObject, GridCacheVersion> drMap,
         @Nullable GridCacheEntryEx cached,
         final boolean retval,
         @Nullable final CacheEntryPredicate[] filter) {
@@ -2730,7 +2727,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         if (retval)
             needReturnValue(true);
 
-        final Collection<? extends K> keys0;
+        final Collection<?> keys0;
 
         if (drMap != null) {
             assert keys == null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 1af6378..936e4e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -129,18 +129,18 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      * @param drMap DR map to put.
      * @return Future for DR put operation.
      */
-    public <K, V> IgniteInternalFuture<?> putAllDrAsync(
+    public IgniteInternalFuture<?> putAllDrAsync(
         GridCacheContext cacheCtx,
-        Map<? extends K, GridCacheDrInfo<V>> drMap);
+        Map<KeyCacheObject, GridCacheDrInfo> drMap);
 
     /**
      * @param cacheCtx Cache context.
      * @param drMap DR map.
      * @return Future for asynchronous remove.
      */
-    public <K> IgniteInternalFuture<?> removeAllDrAsync(
+    public IgniteInternalFuture<?> removeAllDrAsync(
         GridCacheContext cacheCtx,
-        Map<? extends K, GridCacheVersion> drMap);
+        Map<KeyCacheObject, GridCacheVersion> drMap);
 
     /**
      * Performs keys locking for affinity-based group lock transactions.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
index 23585fb..8235ec0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
@@ -18,31 +18,26 @@
 package org.apache.ignite.internal.processors.cache.version;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.dataload.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.marshaller.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
-import java.util.*;
+import java.nio.*;
 
 /**
  * Raw versioned entry.
  */
-public class GridCacheRawVersionedEntry<K, V> implements GridCacheVersionedEntry<K,
V>, GridCacheVersionable,
-    Map.Entry<K, V>, Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Key. */
-    private K key;
-
+public class GridCacheRawVersionedEntry<K, V> extends IgniteDataLoaderEntry implements
+    GridCacheVersionedEntry<K, V>, GridCacheVersionable, Externalizable {
     /** Key bytes. */
+    @GridDirectTransient
     private byte[] keyBytes;
 
-    /** Value. */
-    private V val;
-
     /** Value bytes. */
     private byte[] valBytes;
 
@@ -63,21 +58,44 @@ public class GridCacheRawVersionedEntry<K, V> implements GridCacheVersionedEntry
     }
 
     /**
-     * Constructor.
+     * Constructor used for local store load when key and value are available.
      *
      * @param key Key.
-     * @param keyBytes Key bytes.
      * @param val Value.
-     * @param valBytes Value bytes.
      * @param expireTime Expire time.
      * @param ttl TTL.
      * @param ver Version.
      */
-    public GridCacheRawVersionedEntry(K key, @Nullable byte[] keyBytes, @Nullable V val,
@Nullable byte[] valBytes,
-        long ttl, long expireTime, GridCacheVersion ver) {
+    public GridCacheRawVersionedEntry(KeyCacheObject key,
+        @Nullable CacheObject val,
+        long ttl,
+        long expireTime,
+        GridCacheVersion ver) {
+        assert key != null;
+
         this.key = key;
-        this.keyBytes = keyBytes;
         this.val = val;
+        this.ttl = ttl;
+        this.expireTime = expireTime;
+        this.ver = ver;
+    }
+
+    /**
+     * Constructor used in receiver hub where marshalled key and value are available and
we do not want to
+     * unmarshal value.
+     *
+     * @param keyBytes Key.
+     * @param valBytes Value bytes.
+     * @param expireTime Expire time.
+     * @param ttl TTL.
+     * @param ver Version.
+     */
+    public GridCacheRawVersionedEntry(byte[] keyBytes,
+        byte[] valBytes,
+        long ttl,
+        long expireTime,
+        GridCacheVersion ver) {
+        this.keyBytes = keyBytes;
         this.valBytes = valBytes;
         this.ttl = ttl;
         this.expireTime = expireTime;
@@ -88,7 +106,14 @@ public class GridCacheRawVersionedEntry<K, V> implements GridCacheVersionedEntry
     @Override public K key() {
         assert key != null : "Entry is being improperly processed.";
 
-        return key;
+        return key.value(null, false);
+    }
+
+    /**
+     * @param key Key.
+     */
+    public void key(KeyCacheObject key) {
+        this.key = key;
     }
 
     /**
@@ -100,7 +125,7 @@ public class GridCacheRawVersionedEntry<K, V> implements GridCacheVersionedEntry
 
     /** {@inheritDoc} */
     @Override public V value() {
-        return val;
+        return val != null ? val.<V>value(null, false) : null;
     }
 
     /**
@@ -149,90 +174,205 @@ public class GridCacheRawVersionedEntry<K, V> implements GridCacheVersionedEntry
      * Perform internal unmarshal of this entry. It must be performed after entry is deserialized
and before
      * its restored key/value are needed.
      *
+     * @param ctx Context.
      * @param marsh Marshaller.
      * @throws IgniteCheckedException If failed.
      */
-    public void unmarshal(Marshaller marsh) throws IgniteCheckedException {
-        unmarshalKey(marsh);
+    public void unmarshal(CacheObjectContext ctx, Marshaller marsh) throws IgniteCheckedException
{
+        unmarshalKey(ctx, marsh);
 
-        if (valBytes != null && val == null)
+        if (val == null && valBytes != null) {
             val = marsh.unmarshal(valBytes, null);
+
+            val.finishUnmarshal(ctx, null);
+        }
+    }
+
+    /**
+     * @param ctx Context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void unmarshal(CacheObjectContext ctx) throws IgniteCheckedException {
+        assert key != null;
+
+        key.finishUnmarshal(ctx, null);
+
+        if (val != null)
+            val.finishUnmarshal(ctx, null);
     }
 
     /**
      * Perform internal key unmarshal of this entry. It must be performed after entry is
deserialized and before
      * its restored key/value are needed.
      *
+     * @param ctx Context.
      * @param marsh Marshaller.
      * @throws IgniteCheckedException If failed.
      */
-    public void unmarshalKey(Marshaller marsh) throws IgniteCheckedException {
-        if (key == null)
+    public void unmarshalKey(CacheObjectContext ctx, Marshaller marsh) throws IgniteCheckedException
{
+        if (key == null) {
+            assert keyBytes != null;
+
             key = marsh.unmarshal(keyBytes, null);
+
+            key.finishUnmarshal(ctx, null);
+        }
     }
 
     /**
      * Perform internal marshal of this entry before it will be serialized.
      *
+     * @param ctx Context.
      * @param marsh Marshaller.
      * @throws IgniteCheckedException If failed.
      */
-    public void marshal(Marshaller marsh) throws IgniteCheckedException {
-        if (keyBytes == null)
+    public void marshal(CacheObjectContext ctx, Marshaller marsh) throws IgniteCheckedException
{
+        if (keyBytes == null) {
+            key.prepareMarshal(ctx);
+
             keyBytes = marsh.marshal(key);
+        }
+
+        if (valBytes == null && val != null) {
+            val.prepareMarshal(ctx);
 
-        if (valBytes == null && val != null)
             valBytes = marsh.marshal(val);
+        }
+    }
+
+    /**
+     * @param ctx Context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void prepareDirectMarshal(CacheObjectContext ctx) throws IgniteCheckedException
{
+        key.prepareMarshal(ctx);
+
+        if (val != null)
+            val.prepareMarshal(ctx);
     }
 
     /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        assert keyBytes != null;
+    @Override public byte directType() {
+        return 103;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 2:
+                expireTime = reader.readLong("expireTime");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                ttl = reader.readLong("ttl");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                valBytes = reader.readByteArray("valBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
 
-        U.writeByteArray(out, keyBytes);
-        U.writeByteArray(out, valBytes);
+            case 5:
+                ver = reader.readMessage("ver");
 
-        out.writeLong(ttl);
+                if (!reader.isLastRead())
+                    return false;
 
-        if (ttl != 0)
-            out.writeLong(expireTime);
+                reader.incrementState();
 
-        out.writeObject(ver);
+        }
+
+        assert key != null;
+        assert !(val != null && valBytes != null);
+
+        return true;
     }
 
     /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
-        keyBytes = U.readByteArray(in);
-        valBytes = U.readByteArray(in);
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        assert key != null;
+        assert !(val != null && valBytes != null);
+
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 2:
+                if (!writer.writeLong("expireTime", expireTime))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeLong("ttl", ttl))
+                    return false;
 
-        ttl = in.readLong();
+                writer.incrementState();
 
-        if (ttl != 0)
-            expireTime = in.readLong();
+            case 4:
+                if (!writer.writeByteArray("valBytes", valBytes))
+                    return false;
 
-        ver = (GridCacheVersion)in.readObject();
+                writer.incrementState();
 
-        assert keyBytes != null;
+            case 5:
+                if (!writer.writeMessage("ver", ver))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
     }
 
     /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheRawVersionedEntry.class, this, "keyBytesLen",
-            keyBytes != null ? keyBytes.length : "n/a", "valBytesLen", valBytes != null ?
valBytes.length : "n/a");
+    @Override public byte fieldsCount() {
+        return 6;
     }
 
     /** {@inheritDoc} */
-    @Override public K getKey() {
-        return key();
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        assert false;
     }
 
     /** {@inheritDoc} */
-    @Override public V getValue() {
-        return value();
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+        assert false;
     }
 
     /** {@inheritDoc} */
-    @Override public V setValue(V val) {
-        throw new UnsupportedOperationException();
+    @Override public String toString() {
+        return S.toString(GridCacheRawVersionedEntry.class, this, "keyBytesLen",
+            keyBytes != null ? keyBytes.length : "n/a", "valBytesLen",
+            valBytes != null ? valBytes.length : "n/a");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
index 9a6cbd2..1fe8a25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
@@ -17,7 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache.version;
 
+import org.apache.ignite.plugin.extensions.communication.*;
+
 import java.io.*;
+import java.nio.*;
 
 /**
  * Extended cache version which also has additional DR version.
@@ -87,6 +90,66 @@ public class GridCacheVersionEx extends GridCacheVersion {
     }
 
     /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 104;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 4:
+                if (!writer.writeMessage("drVer", drVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 4:
+                drVer = reader.readMessage("drVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException {
         super.readExternal(in);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 74b38f9..20dd6bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -37,7 +37,7 @@ import static org.apache.ignite.events.EventType.*;
  * like, for example GridCacheContext, as it may be reused between different
  * caches.
  */
-public class GridCacheVersionManager<K, V> extends GridCacheSharedManagerAdapter<K,
V> {
+public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
     /** Timestamp used as base time for cache topology version (January 1, 2014). */
     public static final long TOP_VER_BASE_TIME = 1388520000000L;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
index 7d46e23..e54a281 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
@@ -57,6 +57,7 @@ class GridDataLoadUpdateJob implements GridPlainCallable<Object> {
      * @param cacheName Cache name.
      * @param col Entries to put.
      * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
+     * @param skipStore Skip store flag.
      * @param updater Updater.
      */
     GridDataLoadUpdateJob(

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderEntry.java
index 89bebe4..f719cfc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderEntry.java
@@ -31,11 +31,11 @@ import java.util.*;
 public class IgniteDataLoaderEntry implements Map.Entry<KeyCacheObject, CacheObject>,
Message {
     /** */
     @GridToStringInclude
-    private KeyCacheObject key;
+    protected KeyCacheObject key;
 
     /** */
     @GridToStringInclude
-    private CacheObject val;
+    protected CacheObject val;
 
     /**
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
index 30fd8bb..154e685 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
@@ -243,6 +243,13 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K,
V>, Delay
     }
 
     /**
+     * @return Cache object context.
+     */
+    public CacheObjectContext cacheObjectContext() {
+        return cacheObjCtx;
+    }
+
+    /**
      * Enters busy lock.
      */
     private void enterBusy() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
index 115141d..ae8c77b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
@@ -32,19 +32,20 @@ import java.util.*;
 /**
  * Data center replication cache updater for data loader.
  */
-public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataLoader.Updater<K,
V>,
+public class GridDrDataLoadCacheUpdater implements IgniteDataLoader.Updater<KeyCacheObject,
CacheObject>,
     GridDataLoadCacheUpdaters.InternalUpdater {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** {@inheritDoc} */
-    @Override public void update(IgniteCache<K, V> cache0, Collection<Map.Entry<K,
V>> col) {
+    @Override public void update(IgniteCache<KeyCacheObject, CacheObject> cache0,
+        Collection<Map.Entry<KeyCacheObject, CacheObject>> col) {
         try {
             String cacheName = cache0.getConfiguration(CacheConfiguration.class).getName();
 
             GridKernalContext ctx = ((IgniteKernal)cache0.unwrap(Ignite.class)).context();
             IgniteLogger log = ctx.log(GridDrDataLoadCacheUpdater.class);
-            GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName);
+            GridCacheAdapter cache = ctx.cache().internalCache(cacheName);
 
             assert !F.isEmpty(col);
 
@@ -56,20 +57,24 @@ public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataLoader.Update
             if (!f.isDone())
                 f.get();
 
-            for (Map.Entry<K, V> entry0 : col) {
-                GridCacheRawVersionedEntry<K, V> entry = (GridCacheRawVersionedEntry<K,
V>)entry0;
+            CacheObjectContext cacheObjCtx = cache.context().cacheObjectContext();
 
-                entry.unmarshal(ctx.config().getMarshaller());
+            for (Map.Entry<KeyCacheObject, CacheObject> entry0 : col) {
+                GridCacheRawVersionedEntry entry = (GridCacheRawVersionedEntry)entry0;
 
-                K key = entry.key();
+                entry.unmarshal(cacheObjCtx, ctx.config().getMarshaller());
+
+                KeyCacheObject key = entry.getKey();
 
                 // Ensure that updater to not receive special-purpose values for TTL and
expire time.
                 assert entry.ttl() != CU.TTL_NOT_CHANGED && entry.ttl() != CU.TTL_ZERO
&& entry.ttl() >= 0;
                 assert entry.expireTime() != CU.EXPIRE_TIME_CALCULATE && entry.expireTime()
>= 0;
 
-                GridCacheDrInfo<V> val = entry.value() != null ? entry.ttl() != CU.TTL_ETERNAL
?
-                    new GridCacheDrExpirationInfo<>(entry.value(), entry.version(),
entry.ttl(), entry.expireTime()) :
-                    new GridCacheDrInfo<>(entry.value(), entry.version()) : null;
+                CacheObject cacheVal = entry.getValue();
+
+                GridCacheDrInfo val = cacheVal != null ? entry.ttl() != CU.TTL_ETERNAL ?
+                    new GridCacheDrExpirationInfo(cacheVal, entry.version(), entry.ttl(),
entry.expireTime()) :
+                    new GridCacheDrInfo(cacheVal, entry.version()) : null;
 
                 if (val == null)
                     cache.removeAllConflict(Collections.singletonMap(key, entry.version()));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerLoadTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerLoadTest.java
index 6b97f9a..1235a1d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerLoadTest.java
@@ -60,7 +60,7 @@ public class GridCacheTtlManagerLoadTest extends GridCacheTtlManagerSelfTest
{
                 }
             }, 1);
 
-            GridCacheTtlManager<Object, Object> ttlMgr = g.internalCache().context().ttl();
+            GridCacheTtlManager ttlMgr = g.internalCache().context().ttl();
 
             for (int i = 0; i < 300; i++) {
                 U.sleep(1000);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index f1f58f9..644644e 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -48,7 +48,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K,
V> {
             new GridCacheSharedContext<>(
                 ctx,
                 new IgniteTxManager(),
-                new GridCacheVersionManager<K, V>(),
+                new GridCacheVersionManager(),
                 new GridCacheMvccManager(),
                 new GridCacheDeploymentManager<K, V>(),
                 new GridCachePartitionExchangeManager<K, V>(),
@@ -67,8 +67,8 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K,
V> {
             new CacheContinuousQueryManager<K, V>(),
             new GridCacheAffinityManager<K, V>(),
             new CacheDataStructuresManager<K, V>(),
-            new GridCacheTtlManager<K, V>(),
-            new GridOsCacheDrManager<K, V>(),
-            new CacheNoopJtaManager<K, V>());
+            new GridCacheTtlManager(),
+            new GridOsCacheDrManager(),
+            new CacheNoopJtaManager());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
index 6077e4a..7153c7d 100644
--- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
+++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
@@ -28,7 +28,7 @@ import javax.transaction.*;
 /**
  * Implementation of {@link CacheJtaManagerAdapter}.
  */
-public class CacheJtaManager<K, V> extends CacheJtaManagerAdapter<K, V> {
+public class CacheJtaManager extends CacheJtaManagerAdapter {
     /** */
     private final ThreadLocal<GridCacheXAResource> xaRsrc = new ThreadLocal<>();
 


Mime
View raw message