ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5075
Date Mon, 29 May 2017 08:55:08 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5075-pds 44d5e92bb -> dba7dbdf6


ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dba7dbdf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dba7dbdf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dba7dbdf

Branch: refs/heads/ignite-5075-pds
Commit: dba7dbdf6b28c77de1b16ef38aa265ecf89a328a
Parents: 44d5e92
Author: sboikov <sboikov@gridgain.com>
Authored: Mon May 29 11:55:01 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon May 29 11:55:01 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/pagemem/PageUtils.java      |  16 ++
 .../MetaPageUpdatePartitionDataRecord.java      |  14 +-
 .../cache/database/tree/io/PageIO.java          |   6 +
 .../tree/io/PagePartitionCountersIO.java        | 173 +++++++++++++++++++
 .../database/tree/io/PagePartitionMetaIO.java   |  20 +++
 .../cache/database/GridCacheOffheapManager.java | 138 ++++++++++++++-
 .../wal/serializer/RecordV1Serializer.java      |   6 +-
 .../IgnitePersistentStoreCacheGroupsTest.java   |   2 +
 8 files changed, 368 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dba7dbdf/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java
index f824368..3fa5954 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java
@@ -138,6 +138,22 @@ public class PageUtils {
     /**
      * @param addr Address.
      * @param off Offset.
+     * @param bytes Bytes array.
+     * @param bytesOff Bytes array offset.
+     * @param len Length.
+     */
+    public static void putBytes(long addr, int off, byte[] bytes, int bytesOff, int len)
{
+        assert addr > 0 : addr;
+        assert off >= 0;
+        assert bytes != null;
+        assert bytesOff >= 0 && (bytesOff < bytes.length || bytes.length ==
0) : bytesOff;
+
+        GridUnsafe.copyMemory(bytes, GridUnsafe.BYTE_ARR_OFF + bytesOff, null, addr + off,
len);
+    }
+
+    /**
+     * @param addr Address.
+     * @param off Offset.
      * @param v Value.
      */
     public static void putByte(long addr, int off, byte v) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/dba7dbdf/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
index ef57c46..b28dd52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java
@@ -41,6 +41,9 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord {
     /** */
     private int allocatedIdxCandidate;
 
+    /** */
+    private long cntrsPageId;
+
     /**
      * @param cacheId Cache ID.
      * @param pageId Page ID.
@@ -52,7 +55,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord {
         long updateCntr,
         long globalRmvId,
         int partSize,
-        byte state,
+        long cntrsPageId, byte state,
         int allocatedIdxCandidate
     ) {
         super(cacheId, pageId);
@@ -62,6 +65,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord {
         this.partSize = partSize;
         this.state = state;
         this.allocatedIdxCandidate = allocatedIdxCandidate;
+        this.cntrsPageId = cntrsPageId;
     }
 
     /**
@@ -86,6 +90,13 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord
{
     }
 
     /**
+     * @return Partition size.
+     */
+    public long countersPageId() {
+        return cntrsPageId;
+    }
+
+    /**
      * @return Partition state
      */
     public byte state() {
@@ -99,6 +110,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord
{
         io.setUpdateCounter(pageAddr, updateCntr);
         io.setGlobalRemoveId(pageAddr, globalRmvId);
         io.setSize(pageAddr, partSize);
+        io.setCountersPageId(pageAddr, cntrsPageId);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/dba7dbdf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
index 927446b..e40ed11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
@@ -169,6 +169,9 @@ public abstract class PageIO {
     /** */
     public static final short T_CACHE_ID_AWARE_PENDING_REF_LEAF = 19;
 
+    /** */
+    public static final short T_PART_CNTRS = 20;
+
     /** Index for payload == 1. */
     public static final short T_H2_EX_REF_LEAF_START = 10000;
 
@@ -442,6 +445,9 @@ public abstract class PageIO {
             case T_PART_META:
                 return (Q)PagePartitionMetaIO.VERSIONS.forVersion(ver);
 
+            case T_PART_CNTRS:
+                return (Q)PagePartitionCountersIO.VERSIONS.forVersion(ver);
+
             case T_PAGE_UPDATE_TRACKING:
                 return (Q)TrackingPageIO.VERSIONS.forVersion(ver);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dba7dbdf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionCountersIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionCountersIO.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionCountersIO.java
new file mode 100644
index 0000000..34dba22
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionCountersIO.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.database.tree.io;
+
+import java.util.Map;
+import org.apache.ignite.internal.pagemem.PageUtils;
+
+/**
+ *
+ */
+public class PagePartitionCountersIO extends PageIO {
+    /** */
+    private static final int CNT_OFF = COMMON_HEADER_END;
+
+    /** */
+    private static final int LAST_FLAG_OFF = CNT_OFF + 2;
+
+    /** */
+    private static final int NEXT_COUNTERS_PAGE_OFF = LAST_FLAG_OFF + 1;
+
+    /** */
+    private static final int ITEMS_OFF = NEXT_COUNTERS_PAGE_OFF + 8;
+
+    /** */
+    private static final int ITEM_SIZE = 12;
+
+    /** */
+    private static final byte LAST_FLAG = 0b1;
+
+    /** */
+    public static final IOVersions<PagePartitionCountersIO> VERSIONS = new IOVersions<>(
+        new PagePartitionCountersIO(1)
+    );
+
+    /**
+     * @param ver Page format version.
+     */
+    public PagePartitionCountersIO(int ver) {
+        super(T_PART_CNTRS, ver);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initNewPage(long pageAddr, long pageId, int pageSize) {
+        super.initNewPage(pageAddr, pageId, pageSize);
+
+        setCount(pageAddr, 0);
+        setNextCountersPageId(pageAddr, 0);
+    }
+
+    /**
+     * @param pageAddr Page address.
+     * @return Next counters page ID or {@code 0} if it does not exist.
+     */
+    public long getNextCountersPageId(long pageAddr) {
+        return PageUtils.getLong(pageAddr, NEXT_COUNTERS_PAGE_OFF);
+    }
+
+    /**
+     * @param pageAddr Page address.
+     * @param partMetaPageId Next counters page ID.
+     */
+    public void setNextCountersPageId(long pageAddr, long partMetaPageId) {
+        PageUtils.putLong(pageAddr, NEXT_COUNTERS_PAGE_OFF, partMetaPageId);
+    }
+
+    /**
+     * @param pageSize Page size.
+     * @param pageAddr Page address.
+     * @param cacheSizes Serialized cache size items (pairs of cache ID and its size).
+     * @return Number of written pairs.
+     */
+    public int writeCacheSizes(int pageSize, long pageAddr, byte[] cacheSizes, int itemsOff)
{
+        assert cacheSizes != null;
+        assert cacheSizes.length % ITEM_SIZE == 0 : cacheSizes.length;
+
+        int cap = getCapacity(pageSize);
+
+        int items = (cacheSizes.length / ITEM_SIZE) - itemsOff;
+        int write = Math.min(cap, items);
+
+        PageUtils.putBytes(pageAddr, ITEMS_OFF, cacheSizes, itemsOff * ITEM_SIZE, write *
ITEM_SIZE);
+
+        setCount(pageAddr, write);
+
+        setLastFlag(pageAddr, write == items);
+
+        return write;
+    }
+
+    /**
+     * @param pageAddr Page address.
+     * @param res Result map of cache sizes.
+     * @return {@code True} if the map was fully read.
+     */
+    public boolean readCacheSizes(long pageAddr, Map<Integer, Long> res) {
+        int cnt = getCount(pageAddr);
+
+        assert cnt >= 0 && cnt <= Short.MAX_VALUE : cnt;
+
+        if (cnt == 0)
+            return true;
+
+        int off = ITEMS_OFF;
+
+        for (int i = 0; i < cnt; i++) {
+            int cacheId = PageUtils.getInt(pageAddr, off);
+            off += 4;
+
+            assert cacheId != 0;
+
+            long cacheSize = PageUtils.getLong(pageAddr, off);
+            off += 8;
+
+            assert cacheSize > 0 : cacheSize;
+
+            Long old = res.put(cacheId, cacheSize);
+
+            assert old == null;
+        }
+
+        return getLastFlag(pageAddr);
+    }
+
+    private boolean getLastFlag(long pageAddr) {
+        return PageUtils.getByte(pageAddr, LAST_FLAG_OFF) == LAST_FLAG;
+    }
+
+    private void setLastFlag(long pageAddr, boolean last) {
+        PageUtils.putByte(pageAddr, LAST_FLAG_OFF, last ? LAST_FLAG : ~LAST_FLAG);
+    }
+
+    /**
+     * @param pageAddr Page address.
+     * @return Stored items count.
+     */
+    private int getCount(long pageAddr) {
+        return PageUtils.getShort(pageAddr, CNT_OFF);
+    }
+
+    /**
+     * @param pageAddr Page address.
+     * @param cnt Stored items count.
+     */
+    private void setCount(long pageAddr, int cnt) {
+        assert cnt >= 0 && cnt <= Short.MAX_VALUE : cnt;
+
+        PageUtils.putShort(pageAddr, CNT_OFF, (short)cnt);
+    }
+
+    /**
+     * @param pageSize Page size.
+     * @return Maximum number of items which can be stored in buffer.
+     */
+    private int getCapacity(int pageSize) {
+        return (pageSize - ITEMS_OFF) / ITEM_SIZE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dba7dbdf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java
index aca0725..67cc5a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java
@@ -37,6 +37,9 @@ public class PagePartitionMetaIO extends PageMetaIO {
     private static final int PARTITION_STATE_OFF = GLOBAL_RMV_ID_OFF + 8;
 
     /** */
+    private static final int NEXT_PART_META_PAGE_OFF = PARTITION_STATE_OFF + 1;
+
+    /** */
     public static final IOVersions<PagePartitionMetaIO> VERSIONS = new IOVersions<>(
         new PagePartitionMetaIO(1)
     );
@@ -49,6 +52,7 @@ public class PagePartitionMetaIO extends PageMetaIO {
         setUpdateCounter(pageAddr, 0);
         setGlobalRemoveId(pageAddr, 0);
         setPartitionState(pageAddr, (byte)-1);
+        setCountersPageId(pageAddr, 0);
     }
 
     /**
@@ -120,4 +124,20 @@ public class PagePartitionMetaIO extends PageMetaIO {
     public void setPartitionState(long pageAddr, byte state) {
         PageUtils.putByte(pageAddr, PARTITION_STATE_OFF, state);
     }
+
+    /**
+     * @param pageAddr Page address.
+     * @return Next meta partial page ID or {@code 0} if it does not exist.
+     */
+    public long getCountersPageId(long pageAddr) {
+        return PageUtils.getLong(pageAddr, NEXT_PART_META_PAGE_OFF);
+    }
+
+    /**
+     * @param pageAddr Page address.
+     * @param metaPageId Next partial meta page ID.
+     */
+    public void setCountersPageId(long pageAddr, long metaPageId) {
+        PageUtils.putLong(pageAddr, NEXT_PART_META_PAGE_OFF, metaPageId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dba7dbdf/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java
b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java
index f14a534..bd2e9cb 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.internal.processors.cache.database;
 
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCheckedException;
@@ -51,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.database.pagemem.PageMemoryEx
 import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.database.tree.io.PageMetaIO;
 import org.apache.ignite.internal.processors.cache.database.tree.io.PagePartitionMetaIO;
+import org.apache.ignite.internal.processors.cache.database.tree.io.PagePartitionCountersIO;
 import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseListImpl;
 import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
@@ -58,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -179,11 +183,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl
imple
 
             freeList.saveMetadata();
 
-            // TODO IGNITE-5075: save cache sizes.
             long updCntr = store.updateCounter();
             int size = store.fullSize();
             long rmvId = globalRemoveId().get();
-            Map<Integer, Long> cacheSizes = grp.sharedGroup() ? store.cacheSizes()
: null;
 
             PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory();
             IgniteWriteAheadLogManager wal = this.ctx.wal();
@@ -222,6 +224,68 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl
imple
 
                         io.setPartitionState(pageAddr, (byte)state);
 
+                        long cntrsPageId;
+
+                        if (grp.sharedGroup()) {
+                            cntrsPageId = io.getCountersPageId(pageAddr);
+
+                            byte[] data = serializeCacheSizes(store.cacheSizes());
+
+                            int items = data.length / 12;
+                            int written = 0;
+                            int pageSize = pageMem.pageSize();
+
+                            boolean init = cntrsPageId == 0;
+
+                            if (init) {
+                                cntrsPageId = pageMem.allocatePage(grpId, store.partId(),
PageIdAllocator.FLAG_DATA);
+                                io.setCountersPageId(pageAddr, cntrsPageId);
+                            }
+
+                            long nextId = cntrsPageId;
+
+                            while (written != items) {
+                                final long curId = nextId;
+                                final long curPage = pageMem.acquirePage(grpId, curId);
+
+                                try {
+                                    final long curAddr = pageMem.writeLock(grpId, curId,
curPage);
+
+                                    assert curAddr != 0;
+
+                                    try {
+                                        PagePartitionCountersIO partMetaIo;
+
+                                        if (init) {
+                                            partMetaIo = PagePartitionCountersIO.VERSIONS.latest();
+                                            partMetaIo.initNewPage(curAddr, curId, pageSize);
+                                        }
+                                        else
+                                            partMetaIo = PageIO.getPageIO(curAddr);
+
+                                        written += partMetaIo.writeCacheSizes(pageSize, curAddr,
data, written);
+
+                                        nextId = partMetaIo.getNextCountersPageId(curAddr);
+
+                                        if(written != items && (init = nextId ==
0)) {
+                                            //allocate new counters page
+                                            nextId = pageMem.allocatePage(grpId, store.partId(),
PageIdAllocator.FLAG_DATA);
+                                            partMetaIo.setNextCountersPageId(curAddr, nextId);
+                                        }
+                                    }
+                                    finally {
+                                        // Write full page
+                                        pageMem.writeUnlock(grpId, curId, curPage, Boolean.TRUE,
true);
+                                    }
+                                }
+                                finally {
+                                    pageMem.releasePage(grpId, curId, curPage);
+                                }
+                            }
+                        }
+                        else
+                            cntrsPageId = 0L;
+
                         int pageCnt;
 
                         if (beforeSnapshot) {
@@ -275,6 +339,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl
imple
                                 updCntr,
                                 rmvId,
                                 size,
+                                cntrsPageId,
                                 (byte)state,
                                 pageCnt
                             ));
@@ -293,6 +358,32 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl
imple
     }
 
     /**
+     * @param cacheSizes Cache sizes.
+     * @return Serialized cache sizes
+     */
+    private byte[] serializeCacheSizes(Map<Integer, Long> cacheSizes) {
+        Set<Map.Entry<Integer, Long>> entries = cacheSizes.entrySet();
+        cacheSizes = new TreeMap<>();
+
+        // Sort and filter zero-sized caches.
+        for (Map.Entry<Integer, Long> entry : entries) {
+            if (entry.getValue() != 0)
+                cacheSizes.put(entry.getKey(), entry.getValue());
+        }
+
+        // Item size = 4 bytes (cache ID) + 8 bytes (cache size) = 12 bytes
+        byte[] data = new byte[cacheSizes.size() * 12];
+        long off = GridUnsafe.BYTE_ARR_OFF;
+
+        for (Map.Entry<Integer, Long> entry : cacheSizes.entrySet()) {
+            GridUnsafe.putInt(data, off, entry.getKey()); off += 4;
+            GridUnsafe.putLong(data, off, entry.getValue()); off += 8;
+        }
+
+        return data;
+    }
+
+    /**
      * @param map Map to add values to.
      * @param pageAddr page address
      * @param io Page Meta IO
@@ -820,8 +911,47 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl
imple
                             if (PageIO.getType(pageAddr) != 0) {
                                 PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.latest();
 
-                                // TODO IGNITE-5075.
-                                delegate0.init(io.getSize(pageAddr), io.getUpdateCounter(pageAddr),
null);
+                                Map<Integer, Long> cacheSizes = null;
+
+                                if (grp.sharedGroup()) {
+                                    long cntrsPageId = io.getCountersPageId(pageAddr);
+
+                                    if (cntrsPageId != 0L) {
+                                        cacheSizes = new HashMap<>();
+
+                                        long nextId = cntrsPageId;
+
+                                        while (true){
+                                            final long curId = nextId;
+                                            final long curPage = pageMem.acquirePage(grpId,
curId);
+
+                                            try {
+                                                final long curAddr = pageMem.readLock(grpId,
curId, curPage);
+
+                                                assert curAddr != 0;
+
+                                                try {
+                                                    PagePartitionCountersIO cntrsIO = PageIO.getPageIO(curAddr);
+
+                                                    if (cntrsIO.readCacheSizes(curAddr, cacheSizes))
+                                                        break;
+
+                                                    nextId = cntrsIO.getNextCountersPageId(curAddr);
+
+                                                    assert nextId != 0;
+                                                }
+                                                finally {
+                                                    pageMem.readUnlock(grpId, curId, curPage);
+                                                }
+                                            }
+                                            finally {
+                                                pageMem.releasePage(grpId, curId, curPage);
+                                            }
+                                        }
+                                    }
+                                }
+
+                                delegate0.init(io.getSize(pageAddr), io.getUpdateCounter(pageAddr),
cacheSizes);
 
                                 globalRemoveId().setIfGreater(io.getGlobalRemoveId(pageAddr));
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dba7dbdf/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
index cd9c41e..4f532ae 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/wal/serializer/RecordV1Serializer.java
@@ -197,6 +197,7 @@ public class RecordV1Serializer implements RecordSerializer {
                 buf.putLong(partDataRec.updateCounter());
                 buf.putLong(partDataRec.globalRemoveId());
                 buf.putInt(partDataRec.partitionSize());
+                buf.putLong(partDataRec.countersPageId());
                 buf.put(partDataRec.state());
                 buf.putInt(partDataRec.allocatedIndexCandidate());
 
@@ -756,10 +757,11 @@ public class RecordV1Serializer implements RecordSerializer {
                 long updCntr = in.readLong();
                 long rmvId = in.readLong();
                 int partSize = in.readInt();
+                long countersPageId = in.readLong();
                 byte state = in.readByte();
                 int allocatedIdxCandidate = in.readInt();
 
-                res = new MetaPageUpdatePartitionDataRecord(cacheId, pageId, updCntr, rmvId,
partSize, state, allocatedIdxCandidate);
+                res = new MetaPageUpdatePartitionDataRecord(cacheId, pageId, updCntr, rmvId,
partSize, countersPageId, state, allocatedIdxCandidate);
 
                 break;
 
@@ -1244,7 +1246,7 @@ public class RecordV1Serializer implements RecordSerializer {
                 return 1 + /*cache ID*/4 + /*page ID*/8 + /*ioType*/2  + /*ioVer*/2 +  /*tree
root*/8 + /*reuse root*/8 +  /*CRC*/4;
 
             case PARTITION_META_PAGE_UPDATE_COUNTERS:
-                return 1 + /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part
size*/4 + /*state*/ 1
+                return 1 + /*cache ID*/4 + /*page ID*/8 + /*upd cntr*/8 + /*rmv id*/8 + /*part
size*/4 + /*counters page id*/8 + /*state*/ 1
                     + /*allocatedIdxCandidate*/ 4 + /*CRC*/4;
 
             case MEMORY_RECOVERY:

http://git-wip-us.apache.org/repos/asf/ignite/blob/dba7dbdf/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java
b/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java
index d2a5177..c0c1b01 100644
--- a/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java
+++ b/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java
@@ -94,6 +94,8 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
 
         cfg.setBinaryConfiguration(new BinaryConfiguration().setCompactFooter(false));
 
+        cfg.setBinaryConfiguration(new BinaryConfiguration().setCompactFooter(false));
+
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
 
         if (ccfgs != null) {


Mime
View raw message