Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3ECD9200C04 for ; Tue, 24 Jan 2017 13:19:24 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3D4A0160B4B; Tue, 24 Jan 2017 12:19:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3C71F160B3D for ; Tue, 24 Jan 2017 13:19:23 +0100 (CET) Received: (qmail 55109 invoked by uid 500); 24 Jan 2017 12:19:22 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 55097 invoked by uid 99); 24 Jan 2017 12:19:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jan 2017 12:19:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3077EDFB0E; Tue, 24 Jan 2017 12:19:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: <5651db34f38c4f8c85dff5d7044b127a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: ignite-3477 wal record. Date: Tue, 24 Jan 2017 12:19:22 +0000 (UTC) archived-at: Tue, 24 Jan 2017 12:19:24 -0000 Repository: ignite Updated Branches: refs/heads/ignite-3477-1 fb48c50ff -> 5401b0fd6 ignite-3477 wal record. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5401b0fd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5401b0fd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5401b0fd Branch: refs/heads/ignite-3477-1 Commit: 5401b0fd672bbb04071a497d09388db162eba2ce Parents: fb48c50 Author: sboikov Authored: Tue Jan 24 15:19:13 2017 +0300 Committer: sboikov Committed: Tue Jan 24 15:19:13 2017 +0300 ---------------------------------------------------------------------- .../internal/pagemem/wal/record/WALRecord.java | 5 +- .../wal/record/delta/DataPageUpdateRecord.java | 79 ++++++++++++++++++++ .../cache/database/freelist/FreeListImpl.java | 44 ++++++++--- .../cache/database/tree/io/DataPageIO.java | 11 ++- 4 files changed, 125 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5401b0fd/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java index 959ad7d..f761f68 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java @@ -154,7 +154,10 @@ public abstract class WALRecord { PAGE_LIST_META_RESET_COUNT_RECORD, /** Switch segment record. */ - SWITCH_SEGMENT_RECORD + SWITCH_SEGMENT_RECORD, + + /** */ + DATA_PAGE_UPDATE_RECORD ; /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5401b0fd/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageUpdateRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageUpdateRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageUpdateRecord.java new file mode 100644 index 0000000..65b7172 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageUpdateRecord.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagemem.wal.record.delta; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; + +/** + * Update existing record in data page. + */ +public class DataPageUpdateRecord extends PageDeltaRecord { + /** */ + private int itemId; + + /** */ + private byte[] payload; + + /** + * @param cacheId Cache ID. + * @param pageId Page ID. + * @param itemId Item ID. + * @param payload Record data. + */ + public DataPageUpdateRecord( + int cacheId, + long pageId, + int itemId, + byte[] payload + ) { + super(cacheId, pageId); + + this.payload = payload; + this.itemId = itemId; + } + + /** + * @return Item ID. + */ + public int itemId() { + return itemId; + } + + /** + * @return Insert record payload. + */ + public byte[] payload() { + return payload; + } + + /** {@inheritDoc} */ + @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException { + assert payload != null; + + DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr); + + io.updateRow(pageAddr, itemId, pageMem.pageSize(), payload, null, 0); + } + + /** {@inheritDoc} */ + @Override public RecordType type() { + return RecordType.DATA_PAGE_UPDATE_RECORD; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5401b0fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java index 833df6e..87d5e4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.database.tree.io.CacheVersionIO; import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; @@ -77,7 +78,28 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { throws IgniteCheckedException { DataPageIO io = (DataPageIO)iox; - return io.updateRow(pageAddr, itemId, pageSize(), row, getRowSize(row)); + int rowSize = getRowSize(row); + + boolean updated = io.updateRow(pageAddr, itemId, pageSize(), null, row, getRowSize(row)); + + if (updated && isWalDeltaRecordNeeded(wal, page)) { + // TODO This record must contain only a reference to a logical WAL record with the actual data. + byte[] payload = new byte[rowSize]; + + DataPagePayload data = io.readPayload(pageAddr, itemId, pageSize()); + + assert data.payloadSize() == rowSize; + + PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize); + + wal.log(new DataPageUpdateRecord( + cacheId, + page.id(), + itemId, + payload)); + } + + return updated; } }; @@ -112,7 +134,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { /** * @param page Page. - * @param buf Buffer. + * @param pageAddr Page address. * @param io IO. * @param row Row. * @param rowSize Row size. @@ -121,22 +143,22 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { */ private int addRow( Page page, - long buf, + long pageAddr, DataPageIO io, CacheDataRow row, int rowSize ) throws IgniteCheckedException { - io.addRow(buf, row, rowSize, pageSize()); + io.addRow(pageAddr, row, rowSize, pageSize()); if (isWalDeltaRecordNeeded(wal, page)) { // TODO This record must contain only a reference to a logical WAL record with the actual data. byte[] payload = new byte[rowSize]; - DataPagePayload data = io.readPayload(buf, PageIdUtils.itemId(row.link()), pageSize()); + DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize()); assert data.payloadSize() == rowSize; - PageUtils.getBytes(buf, data.offset(), payload, 0, rowSize); + PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize); wal.log(new DataPageInsertRecord( cacheId, @@ -149,7 +171,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { /** * @param page Page. - * @param buf Buffer. + * @param pageAddr Page address. * @param io IO. * @param row Row. * @param written Written size. @@ -159,7 +181,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { */ private int addRowFragment( Page page, - long buf, + long pageAddr, DataPageIO io, CacheDataRow row, int written, @@ -168,7 +190,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { // Read last link before the fragment write, because it will be updated there. long lastLink = row.link(); - int payloadSize = io.addRowFragment(pageMem, buf, row, written, rowSize, pageSize()); + int payloadSize = io.addRowFragment(pageMem, pageAddr, row, written, rowSize, pageSize()); assert payloadSize > 0 : payloadSize; @@ -176,9 +198,9 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { // TODO This record must contain only a reference to a logical WAL record with the actual data. byte[] payload = new byte[payloadSize]; - DataPagePayload data = io.readPayload(buf, PageIdUtils.itemId(row.link()), pageSize()); + DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize()); - PageUtils.getBytes(buf, data.offset(), payload, 0, payloadSize); + PageUtils.getBytes(pageAddr, data.offset(), payload, 0, payloadSize); wal.log(new DataPageInsertFragmentRecord(cacheId, page.id(), payload, lastLink)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5401b0fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java index 08fa30e..fdb812f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.SB; +import org.jetbrains.annotations.Nullable; /** * Data pages IO. @@ -601,6 +602,7 @@ public class DataPageIO extends PageIO { * @param pageAddr Page address. * @param itemId Item ID. * @param pageSize Page size. + * @param payload Row data. * @param row Row. * @param rowSize Row size. * @return {@code True} if entry is not fragmented. @@ -610,16 +612,21 @@ public class DataPageIO extends PageIO { final long pageAddr, int itemId, int pageSize, - CacheDataRow row, + @Nullable byte[] payload, + @Nullable CacheDataRow row, final int rowSize) throws IgniteCheckedException { assert checkIndex(itemId) : itemId; + assert row != null ^ payload != null; final int dataOff = getDataOffset(pageAddr, itemId, pageSize); if (isFragmented(pageAddr, dataOff)) return false; - writeRowData(pageAddr, dataOff, rowSize, row, false); + if (row != null) + writeRowData(pageAddr, dataOff, rowSize, row, false); + else + writeRowData(pageAddr, dataOff, payload); return true; }