Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4A98618C89 for ; Mon, 22 Feb 2016 21:52:45 +0000 (UTC) Received: (qmail 97347 invoked by uid 500); 22 Feb 2016 21:52:45 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 97244 invoked by uid 500); 22 Feb 2016 21:52:45 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 97227 invoked by uid 99); 22 Feb 2016 21:52:45 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2016 21:52:45 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 8B67F180A0E for ; Mon, 22 Feb 2016 21:52:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.549 X-Spam-Level: X-Spam-Status: No, score=-3.549 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.329] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id IoOt1YkYp4n0 for ; Mon, 22 Feb 2016 21:52:32 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 9A5DD5FB95 for ; Mon, 22 Feb 2016 21:42:52 +0000 (UTC) Received: (qmail 29790 invoked by uid 99); 22 Feb 2016 21:42:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2016 21:42:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7B6C7E03CE; Mon, 22 Feb 2016 21:42:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: udo@apache.org To: commits@geode.incubator.apache.org Date: Mon, 22 Feb 2016 21:44:13 -0000 Message-Id: <4462b94ec67747e68971ba69d2b89905@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [085/100] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BytesAndBitsForCompactor.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BytesAndBitsForCompactor.java index 0000000,3a3b5a1..cc358f5 mode 000000,100644..100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BytesAndBitsForCompactor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BytesAndBitsForCompactor.java @@@ -1,0 -1,94 +1,94 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.gemstone.gemfire.internal.cache; + -import com.gemstone.gemfire.internal.offheap.Chunk; ++import com.gemstone.gemfire.internal.offheap.ObjectChunk; + import com.gemstone.gemfire.internal.offheap.annotations.Unretained; + + /** + * Used to fetch a record's raw bytes and user bits. + * The actual data length in byte array may be less than + * the size of the byte array itself. An integer field contains + * the valid length. This class is used exclusively by the Oplog Compactor + * for rolling the entries. The reason for this class is to reuse the + * underlying byte array for rolling multiple entries there by + * reducing the garbage. + * @author Asif + * @since 5.5 + */ + public class BytesAndBitsForCompactor { + /** + * If dataChunk is set then ignore the "data" and "validLength" fields. + * The dataChunk field is unretained so it can only be used while the RegionEntry is still synced. + * When done with the dataChunk, null it out if you want to reuse the byte[] later. + */ - private @Unretained Chunk dataChunk; ++ private @Unretained ObjectChunk dataChunk; + private byte[] data; + private byte userBits=0; + // length of the data present in the byte array + private int validLength; + private static final byte[] INIT_FOR_WRAPPER = new byte[0]; + // boolean indicating if the object can be reused. + // Typically if the data stores the reference of a value byte [] directly + // from the RegionEntry than this byte array cannot be reused for + //storing another entry's data + private boolean isReusable ; + + public BytesAndBitsForCompactor() { + this.data = INIT_FOR_WRAPPER; + //this.userBits = userBits; + this.validLength = INIT_FOR_WRAPPER.length; + this.isReusable = true; + } + + - public final Chunk getDataChunk() { ++ public final ObjectChunk getDataChunk() { + return this.dataChunk; + } + public final byte[] getBytes() { + return this.data; + } + public final byte getBits() { + return this.userBits; + } + + public final int getValidLength() { + return this.validLength; + } + + public boolean isReusable() { + return this.isReusable; + } + + /** + * + * @param data byte array storing the data + * @param userBits byte with appropriate bits set + * @param validLength The number of bytes representing the data , starting from 0 as offset + * @param isReusable true if this object is safe for reuse as a data holder + */ + public void setData(byte[] data, byte userBits, int validLength, boolean isReusable) { + this.data = data; + this.userBits = userBits; + this.validLength = validLength; + this.isReusable = isReusable; + } - public void setChunkData(Chunk c, byte userBits) { ++ public void setChunkData(ObjectChunk c, byte userBits) { + this.dataChunk = c; + this.userBits = userBits; + } + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java index 0000000,c855cca..327279b mode 000000,100644..100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java @@@ -1,0 -1,2028 +1,2028 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.gemstone.gemfire.internal.cache; + + import java.io.IOException; + import java.nio.ByteBuffer; + + import org.apache.logging.log4j.Logger; + + import com.gemstone.gemfire.cache.CacheClosedException; + import com.gemstone.gemfire.cache.DiskAccessException; + import com.gemstone.gemfire.distributed.internal.DM; + import com.gemstone.gemfire.internal.Assert; + import com.gemstone.gemfire.internal.ByteArrayDataInput; + import com.gemstone.gemfire.internal.HeapDataOutputStream; + import com.gemstone.gemfire.internal.Version; + import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.ValueWrapper; + import com.gemstone.gemfire.internal.cache.DiskStoreImpl.AsyncDiskEntry; + import com.gemstone.gemfire.internal.cache.lru.EnableLRU; + import com.gemstone.gemfire.internal.cache.lru.LRUClockNode; + import com.gemstone.gemfire.internal.cache.lru.LRUEntry; + import com.gemstone.gemfire.internal.cache.persistence.BytesAndBits; + import com.gemstone.gemfire.internal.cache.persistence.DiskRecoveryStore; + import com.gemstone.gemfire.internal.cache.persistence.DiskRegionView; + import com.gemstone.gemfire.internal.cache.versions.VersionStamp; + import com.gemstone.gemfire.internal.cache.versions.VersionTag; + import com.gemstone.gemfire.internal.i18n.LocalizedStrings; + import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.offheap.Chunk; ++import com.gemstone.gemfire.internal.offheap.ObjectChunk; + import com.gemstone.gemfire.internal.offheap.OffHeapHelper; + import com.gemstone.gemfire.internal.offheap.ReferenceCountHelper; + import com.gemstone.gemfire.internal.offheap.Releasable; + import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk; + import com.gemstone.gemfire.internal.offheap.StoredObject; + import com.gemstone.gemfire.internal.offheap.annotations.Released; + import com.gemstone.gemfire.internal.offheap.annotations.Retained; + import com.gemstone.gemfire.internal.offheap.annotations.Unretained; + import com.gemstone.gemfire.internal.util.BlobHelper; + + /** + * Represents an entry in an {@link RegionMap} whose value may be + * stored on disk. This interface provides accessor and mutator + * methods for a disk entry's state. This allows us to abstract all + * of the interesting behavior into a {@linkplain DiskEntry.Helper + * helper class} that we only need to implement once. + * + *

+ * + * Each DiskEntry has a unique id that is + * used by the {@link DiskRegion} to identify the key/value pair. + * Before the disk entry is written to disk, the value of the + * id is {@link DiskRegion#INVALID_ID invalid}. Once the + * object has been written to disk, the id is a positive + * number. If the value is {@linkplain Helper#update updated}, then the + * id is negated to signify that the value on disk is + * dirty. + * + * @see DiskRegion + * + * @author David Whitlock + * + * @since 3.2 + */ + public interface DiskEntry extends RegionEntry { + /** + * Sets the value with a {@link RegionEntryContext}. + * @param context the value's context. + * @param value an entry value. + */ + public void setValueWithContext(RegionEntryContext context,Object value); + + /** + * In some cases we need to do something just before we drop the value + * from a DiskEntry that is being moved (i.e. overflowed) to disk. + * @param context + */ + public void handleValueOverflow(RegionEntryContext context); + + /** + * In some cases we need to do something just after we unset the value + * from a DiskEntry that has been moved (i.e. overflowed) to disk. + * @param context + */ + public void afterValueOverflow(RegionEntryContext context); + + /** + * Returns true if the DiskEntry value is equal to {@link Token#DESTROYED}, {@link Token#REMOVED_PHASE1}, or {@link Token#REMOVED_PHASE2}. + */ + public boolean isRemovedFromDisk(); + + /** + * Returns the id of this DiskEntry + */ + public DiskId getDiskId(); + + public void _removePhase1(); + + public int updateAsyncEntrySize(EnableLRU capacityController); + + public DiskEntry getPrev(); + public DiskEntry getNext(); + public void setPrev(DiskEntry v); + public void setNext(DiskEntry v); + + /** + * Used as the entry value if it was invalidated. + */ + public static final byte[] INVALID_BYTES = new byte[0]; + /** + * Used as the entry value if it was locally invalidated. + */ + public static final byte[] LOCAL_INVALID_BYTES = new byte[0]; + /** + * Used as the entry value if it was tombstone. + */ + public static final byte[] TOMBSTONE_BYTES = new byte[0]; + + /////////////////////// Inner Classes ////////////////////// + + /** + * A Helper class for performing functions common to all + * DiskEntrys. + */ + public static class Helper { + private static final Logger logger = LogService.getLogger(); + + /** + * Testing purpose only + * Get the value of an entry that is on disk without faulting + * it in and without looking in the io buffer. + * @since 3.2.1 + */ + static Object getValueOnDisk(DiskEntry entry, DiskRegion dr) { + DiskId id = entry.getDiskId(); + if (id == null) { + return null; + } + dr.acquireReadLock(); + try { + synchronized (id) { + if (id == null + || (dr.isBackup() && id.getKeyId() == DiskRegion.INVALID_ID) + || (!entry.isValueNull() && id.needsToBeWritten() && !EntryBits.isRecoveredFromDisk(id.getUserBits()))/*fix for bug 41942*/) { + return null; + } + + return dr.getNoBuffer(id); + } + } finally { + dr.releaseReadLock(); + } + } + + /** + * Get the serialized value directly from disk. Returned object may be + * a {@link CachedDeserializable}. Goes straight to disk without faulting + * into memory. Only looks at the disk storage, not at heap storage. + * @param entry the entry used to identify the value to fetch + * @param dr the persistent storage from which to fetch the value + * @return either null, byte array, or CacheDeserializable + * @since gemfire57_hotfix + */ + public static Object getSerializedValueOnDisk( + DiskEntry entry, DiskRegion dr) { + DiskId did = entry.getDiskId(); + if (did == null) { + return null; + } + dr.acquireReadLock(); + try { + synchronized (did) { + if (did == null + || (dr.isBackup() && did.getKeyId() == DiskRegion.INVALID_ID)) { + return null; + } else if (!entry.isValueNull() && did.needsToBeWritten() && !EntryBits.isRecoveredFromDisk(did.getUserBits())/*fix for bug 41942*/) { + return null; + } + return dr.getSerializedData(did); + } + } finally { + dr.releaseReadLock(); + } + } + + + /** + * Get the value of an entry that is on disk without + * faulting it in . It checks for the presence in the buffer also. + * This method is used for concurrent map operations, SQLFabric and CQ processing + * + * @throws DiskAccessException + * @since 5.1 + */ + static Object getValueOnDiskOrBuffer(DiskEntry entry, DiskRegion dr, RegionEntryContext context) { + @Released Object v = getOffHeapValueOnDiskOrBuffer(entry, dr, context); + if (v instanceof CachedDeserializable) { - if (v instanceof Chunk) { - @Released Chunk ohv = (Chunk) v; ++ if (v instanceof ObjectChunk) { ++ @Released ObjectChunk ohv = (ObjectChunk) v; + try { + v = ohv.getDeserializedValue(null, null); + if (v == ohv) { + throw new IllegalStateException("sqlf tried to use getValueOnDiskOrBuffer"); + } + } finally { + ohv.release(); // OFFHEAP the offheap ref is decremented here + } + } else { + v = ((CachedDeserializable)v).getDeserializedValue(null, null); + } + } + return v; + } + + @Retained + static Object getOffHeapValueOnDiskOrBuffer(DiskEntry entry, DiskRegion dr, RegionEntryContext context) { + DiskId did = entry.getDiskId(); + Object syncObj = did; + if (syncObj == null) { + syncObj = entry; + } + if (syncObj == did) { + dr.acquireReadLock(); + } + try { + synchronized (syncObj) { + if (did != null && did.isPendingAsync()) { + @Retained Object v = entry._getValueRetain(context, true); // TODO:KIRK:OK Rusty had Object v = entry.getValueWithContext(context); + if (Token.isRemovedFromDisk(v)) { + v = null; + } + return v; + } + if (did == null + || ( dr.isBackup() && did.getKeyId() == DiskRegion.INVALID_ID) + || (!entry.isValueNull() && did.needsToBeWritten() && !EntryBits.isRecoveredFromDisk(did.getUserBits()))/*fix for bug 41942*/) { + return null; + } + + return dr.getSerializedData(did); + } + } finally { + if (syncObj == did) { + dr.releaseReadLock(); + } + } + } + + /** + * Returns false if the entry is INVALID (or LOCAL_INVALID). Determines this + * without faulting in the value from disk. + * + * @since 3.2.1 + */ + /* TODO prpersist - Do we need this method? It was added by the sqlf merge + static boolean isValid(DiskEntry entry, DiskRegion dr) { + synchronized (entry) { + if (entry.isRecovered()) { + // We have a recovered entry whose value is still on disk. + // So take a peek at it without faulting it in. + //long id = entry.getDiskId().getKeyId(); + //entry.getDiskId().setKeyId(-id); + byte bits = dr.getBits(entry.getDiskId()); + //TODO Asif:Check if resetting is needed + return !EntryBits.isInvalid(bits) && !EntryBits.isLocalInvalid(bits); + } + } + }*/ + + static boolean isOverflowedToDisk(DiskEntry de, DiskRegion dr, DistributedRegion.DiskPosition dp,RegionEntryContext context) { + Object v = null; + DiskId did; + synchronized (de) { + did = de.getDiskId(); + } + Object syncObj = did; + if (syncObj == null) { + syncObj = de; + } + if (syncObj == did) { + dr.acquireReadLock(); + } + try { + synchronized (syncObj) { + if (de.isValueNull()) { + if (did == null) { + synchronized (de) { + did = de.getDiskId(); + } + assert did != null; + return isOverflowedToDisk(de, dr, dp, context); + } else { + dp.setPosition(did.getOplogId(), did.getOffsetInOplog()); + return true; + } + } else { + return false; + } + } + } finally { + if (syncObj == did) { + dr.releaseReadLock(); + } + } + } + + /** + * Get the value of an entry that is on disk without faulting + * it in. + * @since 3.2.1 + */ + static boolean fillInValue(DiskEntry de, InitialImageOperation.Entry entry, + DiskRegion dr, DM mgr, ByteArrayDataInput in, RegionEntryContext context) { + @Retained @Released Object v = null; + DiskId did; + synchronized (de) { + did = de.getDiskId(); + } + Object syncObj = did; + if (syncObj == null) { + syncObj = de; + } + if (syncObj == did) { + dr.acquireReadLock(); + } + try { + synchronized (syncObj) { + entry.setLastModified(mgr, de.getLastModified()); + + ReferenceCountHelper.setReferenceCountOwner(entry); + v = de._getValueRetain(context, true); // OFFHEAP copied to heap entry; todo allow entry to refer to offheap since it will be copied to network. + ReferenceCountHelper.setReferenceCountOwner(null); + if (v == null) { + if (did == null) { + // fix for bug 41449 + synchronized (de) { + did = de.getDiskId(); + } + assert did != null; + // do recursive call to get readLock on did + return fillInValue(de, entry, dr, mgr, in, context); + } + if (logger.isDebugEnabled()) { + logger.debug("DiskEntry.Helper.fillInValue, key={}; getting value from disk, disk id={}", entry.key, did); + } + BytesAndBits bb = null; + try { + bb = dr.getBytesAndBits(did, false); + }catch(DiskAccessException dae){ + return false; + } + if (EntryBits.isInvalid(bb.getBits())) { + entry.setInvalid(); + } + else if (EntryBits.isLocalInvalid(bb.getBits())) { + entry.setLocalInvalid(); + } + else if (EntryBits.isTombstone(bb.getBits())) { + entry.setTombstone(); + } + else { + entry.value = bb.getBytes(); + entry.setSerialized(EntryBits.isSerialized(bb.getBits())); + } + return true; + } + } + } finally { + if (syncObj == did) { + dr.releaseReadLock(); + } + } + final boolean isEagerDeserialize = entry.isEagerDeserialize(); + if (isEagerDeserialize) { + entry.clearEagerDeserialize(); + } + if (Token.isRemovedFromDisk(v)) { + // fix for bug 31757 + return false; + } else if (v instanceof CachedDeserializable) { + try { + if (v instanceof StoredObject && !((StoredObject) v).isSerialized()) { + entry.setSerialized(false); + entry.value = ((StoredObject) v).getDeserializedForReading(); + + //For SQLFire we prefer eager deserialized + // if(v instanceof ByteSource) { + // entry.setEagerDeserialize(); + // } + } else { + // don't serialize here if it is not already serialized + + Object tmp = ((CachedDeserializable)v).getValue(); + //For SQLFire we prefer eager deserialized + // if(v instanceof ByteSource) { + // entry.setEagerDeserialize(); + // } + if (tmp instanceof byte[]) { + byte[] bb = (byte[])tmp; + entry.value = bb; + entry.setSerialized(true); + } + else if (isEagerDeserialize && tmp instanceof byte[][]) { + // optimize for byte[][] since it will need to be eagerly deserialized + // for SQLFabric + entry.value = tmp; + entry.setEagerDeserialize(); + entry.setSerialized(true); + } + else { + try { + HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); + BlobHelper.serializeTo(tmp, hdos); + hdos.trim(); + entry.value = hdos; + entry.setSerialized(true); + } catch (IOException e) { + RuntimeException e2 = new IllegalArgumentException(LocalizedStrings.DiskEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING.toLocalizedString()); + e2.initCause(e); + throw e2; + } + } + } + } finally { + // If v == entry.value then v is assumed to be an OffHeapByteSource + // and release() will be called on v after the bytes have been read from + // off-heap. + if (v != entry.value) { + OffHeapHelper.releaseWithNoTracking(v); + } + } + } + else if (v instanceof byte[]) { + entry.value = v; + entry.setSerialized(false); + } + else if (isEagerDeserialize && v instanceof byte[][]) { + // optimize for byte[][] since it will need to be eagerly deserialized + // for SQLFabric + entry.value = v; + entry.setEagerDeserialize(); + } + else if (v == Token.INVALID) { + entry.setInvalid(); + } + else if (v == Token.LOCAL_INVALID) { + // fix for bug 31107 + entry.setLocalInvalid(); + } else if (v == Token.TOMBSTONE) { + entry.setTombstone(); + } + else { + Object preparedValue = v; + if (preparedValue != null) { + preparedValue = AbstractRegionEntry.prepareValueForGII(preparedValue); + if (preparedValue == null) { + return false; + } + } + if (CachedDeserializableFactory.preferObject()) { + entry.value = preparedValue; + entry.setEagerDeserialize(); + } + else { + try { + HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); + BlobHelper.serializeTo(preparedValue, hdos); + hdos.trim(); + entry.value = hdos; + entry.setSerialized(true); + } catch (IOException e) { + RuntimeException e2 = new IllegalArgumentException(LocalizedStrings.DiskEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING.toLocalizedString()); + e2.initCause(e); + throw e2; + } + } + } + return true; + } + + /** + * Used to initialize a new disk entry + */ + static void initialize(DiskEntry entry, DiskRecoveryStore r, Object newValue) { + DiskRegionView drv = null; + if (r instanceof LocalRegion) { + drv = ((LocalRegion)r).getDiskRegion(); + } else if (r instanceof DiskRegionView) { + drv = (DiskRegionView)r; + } + if (drv == null) { + throw new IllegalArgumentException(LocalizedStrings.DiskEntry_DISK_REGION_IS_NULL.toLocalizedString()); + } + + if (newValue == null || Token.isRemovedFromDisk(newValue)) { + // it is not in vm and it is not on disk + DiskId did = entry.getDiskId(); + if (did != null) { + did.setKeyId(DiskRegion.INVALID_ID); + } + } + else if (newValue instanceof RecoveredEntry) { + // Set the id directly, the value will also be set if RECOVER_VALUES + RecoveredEntry re = (RecoveredEntry)newValue; + DiskId did = entry.getDiskId(); + did.setOplogId(re.getOplogId()); + did.setOffsetInOplog(re.getOffsetInOplog()); + did.setKeyId(re.getRecoveredKeyId()); + did.setUserBits(re.getUserBits()); + did.setValueLength(re.getValueLength()); + if (re.getRecoveredKeyId() < 0) { + drv.incNumOverflowOnDisk(1L); + drv.incNumOverflowBytesOnDisk(did.getValueLength()); + incrementBucketStats(r, 0/*InVM*/, 1/*OnDisk*/, did.getValueLength()); + } + else { + entry.setValueWithContext(drv, entry.prepareValueForCache((RegionEntryContext) r, + re.getValue(), false)); + drv.incNumEntriesInVM(1L); + incrementBucketStats(r, 1/*InVM*/, 0/*OnDisk*/, 0); + } + } + else { + DiskId did = entry.getDiskId(); + if (did != null) { + did.setKeyId(DiskRegion.INVALID_ID); + } + drv.incNumEntriesInVM(1L); + incrementBucketStats(r, 1/*InVM*/, 0/*OnDisk*/, 0); + } + } + + private static final ValueWrapper INVALID_VW = new ByteArrayValueWrapper(true, INVALID_BYTES); + private static final ValueWrapper LOCAL_INVALID_VW = new ByteArrayValueWrapper(true, LOCAL_INVALID_BYTES); + private static final ValueWrapper TOMBSTONE_VW = new ByteArrayValueWrapper(true, TOMBSTONE_BYTES); + + public static interface ValueWrapper { + public boolean isSerialized(); + public int getLength(); + public byte getUserBits(); + public void sendTo(ByteBuffer bb, Flushable flushable) throws IOException; + public String getBytesAsString(); + } + public static interface Flushable { + public void flush() throws IOException; + + public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException; + } + public static class ByteArrayValueWrapper implements ValueWrapper { + public final boolean isSerializedObject; + public final byte[] bytes; + + public ByteArrayValueWrapper(boolean isSerializedObject, byte[] bytes) { + this.isSerializedObject = isSerializedObject; + this.bytes = bytes; + } + + @Override + public boolean isSerialized() { + return this.isSerializedObject; + } + + @Override + public int getLength() { + return (this.bytes != null) ? this.bytes.length : 0; + } + + private boolean isInvalidToken() { + return this == INVALID_VW; + } + + private boolean isLocalInvalidToken() { + return this == LOCAL_INVALID_VW; + } + + private boolean isTombstoneToken() { + return this == TOMBSTONE_VW; + } + + @Override + public byte getUserBits() { + byte userBits = 0x0; + if (isSerialized()) { + if (isTombstoneToken()) { + userBits = EntryBits.setTombstone(userBits, true); + } else if (isInvalidToken()) { + userBits = EntryBits.setInvalid(userBits, true); + } else if (isLocalInvalidToken()) { + userBits = EntryBits.setLocalInvalid(userBits, true); + } else { + if (this.bytes == null) { + throw new IllegalStateException("userBits==1 and value is null"); + } else if (this.bytes.length == 0) { + throw new IllegalStateException("userBits==1 and value is zero length"); + } + userBits = EntryBits.setSerialized(userBits, true); + } + } + return userBits; + } + + @Override + public void sendTo(ByteBuffer bb, Flushable flushable) throws IOException { + int offset = 0; + final int maxOffset = getLength(); + while (offset < maxOffset) { + int bytesThisTime = maxOffset - offset; + boolean needsFlush = false; + if (bytesThisTime > bb.remaining()) { + needsFlush = true; + bytesThisTime = bb.remaining(); + } + bb.put(this.bytes, offset, bytesThisTime); + offset += bytesThisTime; + if (needsFlush) { + flushable.flush(); + } + } + } + + @Override + public String getBytesAsString() { + if (this.bytes == null) { + return "null"; + } + StringBuffer sb = new StringBuffer(); + int len = getLength(); + for (int i = 0; i < len; i++) { + sb.append(this.bytes[i]).append(", "); + } + return sb.toString(); + } + } + + /** + * This class is a bit of a hack used by the compactor. + * For the compactor always copies to a byte[] so + * this class is just a simple wrapper. + * It is possible that the length of the byte array is greater + * than the actual length of the wrapped data. + * At the time we create this we are all done with isSerialized + * and userBits so those methods are not supported. + */ + public static class CompactorValueWrapper extends ByteArrayValueWrapper { + private final int length; + + public CompactorValueWrapper(byte[] bytes, int length) { + super(false, bytes); + this.length = length; + } + + @Override + public boolean isSerialized() { + throw new UnsupportedOperationException(); + } + + @Override + public int getLength() { + return this.length; + } + + @Override + public byte getUserBits() { + throw new UnsupportedOperationException(); + } + } + + /** + * Note that the Chunk this ValueWrapper is created with + * is unretained so it must be used before the owner of + * the chunk releases it. + * Since the RegionEntry that has the value we are writing to + * disk has it retained we are ok as long as this ValueWrapper's + * life ends before the RegionEntry sync is released. + * Note that this class is only used with uncompressed chunks. + */ + public static class ChunkValueWrapper implements ValueWrapper { - private final @Unretained Chunk chunk; - public ChunkValueWrapper(Chunk c) { ++ private final @Unretained ObjectChunk chunk; ++ public ChunkValueWrapper(ObjectChunk c) { + assert !c.isCompressed(); + this.chunk = c; + } + @Override + public boolean isSerialized() { + return this.chunk.isSerialized(); + } + @Override + public int getLength() { + return this.chunk.getDataSize(); + } + @Override + public byte getUserBits() { + byte userBits = 0x0; + if (isSerialized()) { + userBits = EntryBits.setSerialized(userBits, true); + } + return userBits; + } + @Override + public void sendTo(ByteBuffer bb, Flushable flushable) throws IOException { + final int maxOffset = getLength(); + if (maxOffset == 0) { + return; + } + if (maxOffset > bb.capacity()) { + ByteBuffer chunkbb = this.chunk.createDirectByteBuffer(); + if (chunkbb != null) { + flushable.flush(bb, chunkbb); + return; + } + } - final long bbAddress = Chunk.getDirectByteBufferAddress(bb); ++ final long bbAddress = ObjectChunk.getDirectByteBufferAddress(bb); + if (bbAddress != 0L) { + int bytesRemaining = maxOffset; + int availableSpace = bb.remaining(); + long addrToWrite = bbAddress + bb.position(); + long addrToRead = this.chunk.getAddressForReading(0, maxOffset); + if (bytesRemaining > availableSpace) { + do { + UnsafeMemoryChunk.copyMemory(addrToRead, addrToWrite, availableSpace); + bb.position(bb.position()+availableSpace); + addrToRead += availableSpace; + bytesRemaining -= availableSpace; + flushable.flush(); + addrToWrite = bbAddress + bb.position(); + availableSpace = bb.remaining(); + } while (bytesRemaining > availableSpace); + } + UnsafeMemoryChunk.copyMemory(addrToRead, addrToWrite, bytesRemaining); + bb.position(bb.position()+bytesRemaining); + } else { + long addr = this.chunk.getAddressForReading(0, maxOffset); + final long endAddr = addr + maxOffset; + while (addr != endAddr) { + bb.put(UnsafeMemoryChunk.readAbsoluteByte(addr)); + addr++; + if (!bb.hasRemaining()) { + flushable.flush(); + } + } + } + } + @Override + public String getBytesAsString() { + return this.chunk.getStringForm(); + } + } + + public static ValueWrapper createValueWrapper(Object value, EntryEventImpl event) { + if (value == Token.INVALID) { + // even though it is not serialized we say it is because + // bytes will never be an empty array when it is serialized + // so that gives us a way to specify the invalid value + // given a byte array and a boolean flag. + return INVALID_VW; + } + else if (value == Token.LOCAL_INVALID) { + // even though it is not serialized we say it is because + // bytes will never be an empty array when it is serialized + // so that gives us a way to specify the local-invalid value + // given a byte array and a boolean flag. + return LOCAL_INVALID_VW; + } + else if (value == Token.TOMBSTONE) { + return TOMBSTONE_VW; + } + else { + boolean isSerializedObject = true; + byte[] bytes; + if (value instanceof CachedDeserializable) { + CachedDeserializable proxy = (CachedDeserializable)value; - if (proxy instanceof Chunk) { - return new ChunkValueWrapper((Chunk) proxy); ++ if (proxy instanceof ObjectChunk) { ++ return new ChunkValueWrapper((ObjectChunk) proxy); + } + if (proxy instanceof StoredObject) { + StoredObject ohproxy = (StoredObject) proxy; + isSerializedObject = ohproxy.isSerialized(); + if (isSerializedObject) { + bytes = ohproxy.getSerializedValue(); + } else { + bytes = (byte[]) ohproxy.getDeserializedForReading(); + } + } else { + bytes = proxy.getSerializedValue(); + } + if (event != null && isSerializedObject) { + event.setCachedSerializedNewValue(bytes); + } + } + else if (value instanceof byte[]) { + isSerializedObject = false; + bytes = (byte[])value; + } + else { + Assert.assertTrue(!Token.isRemovedFromDisk(value)); + if (event != null && event.getCachedSerializedNewValue() != null) { + bytes = event.getCachedSerializedNewValue(); + } else { + bytes = EntryEventImpl.serialize(value); + if (bytes.length == 0) { + throw new IllegalStateException("serializing <" + value + "> produced empty byte array"); + } + if (event != null) { + event.setCachedSerializedNewValue(bytes); + } + } + } + return new ByteArrayValueWrapper(isSerializedObject, bytes); + } + } + public static ValueWrapper createValueWrapperFromEntry(DiskEntry entry, LocalRegion region, EntryEventImpl event) { + if (event != null) { + // For off-heap it should be faster to pass a reference to the + // StoredObject instead of using the cached byte[] (unless it is also compressed). + // Since NIO is used if the chunk of memory is large we can write it + // to the file with using the off-heap memory with no extra copying. + // So we give preference to getRawNewValue over getCachedSerializedNewValue + Object rawValue = null; + if (!event.hasDelta()) { + // We don't do this for the delta case because getRawNewValue returns delta + // and we want to write the entire new value to disk. + rawValue = event.getRawNewValue(); - if (rawValue instanceof Chunk) { - return new ChunkValueWrapper((Chunk) rawValue); ++ if (rawValue instanceof ObjectChunk) { ++ return new ChunkValueWrapper((ObjectChunk) rawValue); + } + } + if (event.getCachedSerializedNewValue() != null) { + return new ByteArrayValueWrapper(true, event.getCachedSerializedNewValue()); + } + if (rawValue != null) { + return createValueWrapper(rawValue, event); + } + } + // TODO OFFHEAP: No need to retain since we hold the sync on entry but we need a flavor of _getValue that will decompress + @Retained Object value = entry._getValueRetain(region, true); + try { + return createValueWrapper(value, event); + } finally { + OffHeapHelper.release(value); + } + } + + private static void writeToDisk(DiskEntry entry, LocalRegion region, boolean async) throws RegionClearedException { + writeToDisk(entry, region, async, null); + } + + /** + * Writes the key/value object stored in the given entry to disk + * @throws RegionClearedException + * + * @see DiskRegion#put + */ + private static void writeToDisk(DiskEntry entry, LocalRegion region, boolean async, EntryEventImpl event) throws RegionClearedException { + writeBytesToDisk(entry, region, async, createValueWrapperFromEntry(entry, region, event)); + } + + private static void writeBytesToDisk(DiskEntry entry, LocalRegion region, boolean async, ValueWrapper vw) throws RegionClearedException { + // @todo does the following unmark need to be called when an async + // write is scheduled or is it ok for doAsyncFlush to do it? + entry.getDiskId().unmarkForWriting(); + region.getDiskRegion().put(entry, region, vw, async); + } + + public static void update(DiskEntry entry, LocalRegion region, Object newValue) throws RegionClearedException { + update(entry, region, newValue, null); + } + /** + * Updates the value of the disk entry with a new value. This allows us to + * free up disk space in the non-backup case. + * + * @throws RegionClearedException + */ + public static void update(DiskEntry entry, LocalRegion region, Object newValue, EntryEventImpl event) throws RegionClearedException { + DiskRegion dr = region.getDiskRegion(); + if (newValue == null) { + throw new NullPointerException(LocalizedStrings.DiskEntry_ENTRYS_VALUE_SHOULD_NOT_BE_NULL.toLocalizedString()); + } + + //If we have concurrency checks enabled for a persistent region, we need + //to add an entry to the async queue for every update to maintain the RVV + boolean maintainRVV = region.concurrencyChecksEnabled && dr.isBackup(); + + Token oldValue = null; + int oldValueLength = 0; + boolean scheduleAsync = false; + boolean callRemoveFromDisk = false; + DiskId did = entry.getDiskId(); + VersionTag tag = null; + Object syncObj = did; + if (syncObj == null) { + syncObj = entry; + } + if (syncObj == did) { + dr.acquireReadLock(); + } + try { + synchronized (syncObj) { + oldValue = entry.getValueAsToken(); + if (Token.isRemovedFromDisk(newValue)) { + if (dr.isBackup()) { + dr.testIsRecoveredAndClear(did); // fixes bug 41409 + } + RuntimeException rte = null; + try { + if (!Token.isRemovedFromDisk(oldValue)) { + // removeFromDisk takes care of oldValueLength + if (dr.isSync()) { + removeFromDisk(entry, region, false); + } else { + callRemoveFromDisk = true; // do it outside the sync + } + } + } catch (RuntimeException e) { + rte = e; + throw e; + } + finally { + if (rte != null && (rte instanceof CacheClosedException)) { + // 47616: not to set the value to be removedFromDisk since it failed to persist + } else { + // Asif Ensure that the value is rightly set despite clear so + // that it can be distributed correctly + entry.setValueWithContext(region, newValue); // OFFHEAP newValue was already preparedForCache + } + } + } + else if (newValue instanceof RecoveredEntry) { + // Now that oplog creates are immediately put in cache + // a later oplog modify will get us here + RecoveredEntry re = (RecoveredEntry)newValue; + long oldKeyId = did.getKeyId(); + long oldOplogId = did.getOplogId(); + long newOplogId = re.getOplogId(); + if (newOplogId != oldOplogId) { + did.setOplogId(newOplogId); + re.setOplogId(oldOplogId); // so caller knows oldoplog id + } + did.setOffsetInOplog(re.getOffsetInOplog()); + // id already set + did.setUserBits(re.getUserBits()); + oldValueLength = did.getValueLength(); + did.setValueLength(re.getValueLength()); + // The following undo and then do fixes bug 41849 + // First, undo the stats done for the previous recovered value + if (oldKeyId < 0) { + dr.incNumOverflowOnDisk(-1L); + dr.incNumOverflowBytesOnDisk(-oldValueLength); + incrementBucketStats(region, 0/*InVM*/, -1/*OnDisk*/, -oldValueLength); + } else { + dr.incNumEntriesInVM(-1L); + incrementBucketStats(region, -1/*InVM*/, 0/*OnDisk*/, 0); + } + // Second, do the stats done for the current recovered value + if (re.getRecoveredKeyId() < 0) { + if (!entry.isValueNull()) { + try { + entry.handleValueOverflow(region); + entry.setValueWithContext(region, null); // fixes bug 41119 + }finally { + entry.afterValueOverflow(region); + } + + } + dr.incNumOverflowOnDisk(1L); + dr.incNumOverflowBytesOnDisk(did.getValueLength()); + incrementBucketStats(region, 0/*InVM*/, 1/*OnDisk*/, + did.getValueLength()); + } else { + entry.setValueWithContext(region, entry.prepareValueForCache(region, re.getValue(), false)); + dr.incNumEntriesInVM(1L); + incrementBucketStats(region, 1/*InVM*/, 0/*OnDisk*/, 0); + } + } + else { + //The new value in the entry needs to be set after the disk writing + // has succeeded. If not , for GemFireXD , it is possible that other thread + // may pick this transient value from region entry ( which for + //offheap will eventually be released ) as index key, + //given that this operation is bound to fail in case of + //disk access exception. + + //entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared + + if(did != null && did.isPendingAsync()) { + //if the entry was not yet written to disk, we didn't update + //the bytes on disk. + oldValueLength = 0; + } else { + oldValueLength = getValueLength(did); + } + + if (dr.isBackup()) { + dr.testIsRecoveredAndClear(did); // fixes bug 41409 + if (dr.isSync()) { + //In case of compression the value is being set first + // because atleast for now , GemFireXD does not support compression + // if and when it does support, this needs to be taken care of else + // we risk Bug 48965 + if (AbstractRegionEntry.isCompressible(dr, newValue)) { + entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared + + // newValue is prepared and compressed. We can't write compressed values to disk. + writeToDisk(entry, region, false, event); + } else { + writeBytesToDisk(entry, region, false, createValueWrapper(newValue, event)); + entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared + } + + } else if (did.isPendingAsync() && !maintainRVV) { + entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared + + // nothing needs to be done except + // fixing up LRU stats + // @todo fixup LRU stats if needed + // I'm not sure anything needs to be done here. + // If we have overflow and it decided to evict this entry + // how do we handle that case when we are async? + // Seems like the eviction code needs to leave the value + // in memory until the pendingAsync is done. + } else { + //if the entry is not async, we need to schedule it + //for regions with concurrency checks enabled, we add an entry + //to the queue for every entry. + scheduleAsync = true; + did.setPendingAsync(true); + VersionStamp stamp = entry.getVersionStamp(); + if(stamp != null) { + tag = stamp.asVersionTag(); + } + entry.setValueWithContext(region, newValue); + } + } else if (did != null) { + entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared + + // Mark the id as needing to be written + // The disk remove that this section used to do caused bug 30961 + // @todo this seems wrong. How does leaving it on disk fix the bug? + did.markForWriting(); + //did.setValueSerializedSize(0); + }else { + entry.setValueWithContext(region, newValue); + } + + if (Token.isRemovedFromDisk(oldValue)) { + // Note we now initialize entries removed and then set their + // value once we find no existing entry. + // So this is the normal path for a brand new entry. + dr.incNumEntriesInVM(1L); + incrementBucketStats(region, 1/*InVM*/, 0/*OnDisk*/, 0); + } + } + if (entry instanceof LRUEntry) { + LRUEntry le = (LRUEntry)entry; + boolean wasEvicted = le.testEvicted(); + le.unsetEvicted(); + if (!Token.isRemovedFromDisk(newValue)) { + if (oldValue == null + // added null check for bug 41759 + || wasEvicted && did != null && did.isPendingAsync()) { + // Note we do not append this entry because that will be + // done by lruEntryUpdate + dr.incNumEntriesInVM(1L); + dr.incNumOverflowOnDisk(-1L); + dr.incNumOverflowBytesOnDisk(-oldValueLength); + incrementBucketStats(region, 1/*InVM*/, -1/*OnDisk*/, -oldValueLength); + } + } + } + } + } finally { + if (syncObj == did) { + dr.releaseReadLock(); + } + } + if (callRemoveFromDisk) { + removeFromDisk(entry, region, false, oldValue == null, false); + } else if (scheduleAsync && did.isPendingAsync()) { + // this needs to be done outside the above sync + scheduleAsyncWrite(new AsyncDiskEntry(region, entry, tag)); + } + } + + private static int getValueLength(DiskId did) { + int result = 0; + if (did != null) { + synchronized (did) { + result = did.getValueLength(); + } + } + return result; + } + + public static void updateRecoveredEntry(PlaceHolderDiskRegion drv, + DiskEntry entry, + RecoveredEntry newValue,RegionEntryContext context) + { + if (newValue == null) { + throw new NullPointerException(LocalizedStrings.DiskEntry_ENTRYS_VALUE_SHOULD_NOT_BE_NULL.toLocalizedString()); + } + DiskId did = entry.getDiskId(); + synchronized (did) { + boolean oldValueWasNull = entry.isValueNull(); + int oldValueLength = did.getValueLength(); + // Now that oplog creates are immediately put in cache + // a later oplog modify will get us here + long oldOplogId = did.getOplogId(); + long newOplogId = newValue.getOplogId(); + if (newOplogId != oldOplogId) { + did.setOplogId(newOplogId); + newValue.setOplogId(oldOplogId); // so caller knows oldoplog id + } + did.setOffsetInOplog(newValue.getOffsetInOplog()); + // id already set + did.setUserBits(newValue.getUserBits()); + did.setValueLength(newValue.getValueLength()); + if (newValue.getRecoveredKeyId() >= 0) { + entry.setValueWithContext(context, entry.prepareValueForCache(drv, newValue.getValue(), + false)); + } else { + if (!oldValueWasNull) { + try { + entry.handleValueOverflow(context); + entry.setValueWithContext(context,null); // fixes bug 41119 + }finally { + entry.afterValueOverflow(context); + } + } + } + if (entry instanceof LRUEntry) { + LRUEntry le = (LRUEntry)entry; + assert !le.testEvicted(); + // we don't allow eviction during recovery + if (oldValueWasNull) { + // Note we do not append this entry because that will be + // done by lruEntryUpdate + drv.incNumEntriesInVM(1L); + drv.incNumOverflowOnDisk(-1L); + drv.incNumOverflowBytesOnDisk(-oldValueLength); + //No need to call incrementBucketStats here because we don't have + //a real bucket region, this is during recovery from disk. + } + } + } + } + + public static Object getValueInVMOrDiskWithoutFaultIn(DiskEntry entry, LocalRegion region) { + Object result = OffHeapHelper.copyAndReleaseIfNeeded(getValueOffHeapOrDiskWithoutFaultIn(entry, region)); + if (result instanceof CachedDeserializable) { + result = ((CachedDeserializable)result).getDeserializedValue(null, null); + } + if (result instanceof StoredObject) { + ((StoredObject) result).release(); + throw new IllegalStateException("sqlf tried to use getValueInVMOrDiskWithoutFaultIn"); + } + return result; + } + + @Retained + public static Object getValueOffHeapOrDiskWithoutFaultIn(DiskEntry entry, LocalRegion region) { + @Retained Object v = entry._getValueRetain(region, true); // TODO:KIRK:OK Object v = entry.getValueWithContext(region); + if (v == null || Token.isRemovedFromDisk(v) + && !region.isIndexCreationThread()) { + synchronized (entry) { + v = entry._getValueRetain(region, true); // TODO:KIRK:OK v = entry.getValueWithContext(region); + if (v == null) { + v = Helper.getOffHeapValueOnDiskOrBuffer(entry, region.getDiskRegion(),region); + } + } + } + if (Token.isRemovedFromDisk(v)) { + // fix for bug 31800 + v = null; + // } else if (v instanceof ByteSource) { + // // If the ByteSource contains a Delta or ListOfDelta then we want to deserialize it + // Object deserVal = ((CachedDeserializable)v).getDeserializedForReading(); + // if (deserVal != v) { + // OffHeapHelper.release(v); + // v = deserVal; + // } + } + return v; + } + + /** + * + * @param entry + * @param region + * @return Value + * @throws DiskAccessException + */ + public static Object faultInValue(DiskEntry entry, LocalRegion region) { + return faultInValue(entry, region, false); + } + @Retained + public static Object faultInValueRetain(DiskEntry entry, LocalRegion region) { + return faultInValue(entry, region, true); + } + /** + * @param retainResult if true then the result may be a retained off-heap reference + */ + @Retained + private static Object faultInValue(DiskEntry entry, LocalRegion region, boolean retainResult) + { + DiskRegion dr = region.getDiskRegion(); + @Retained Object v = entry._getValueRetain(region, true); // TODO:KIRK:OK Object v = entry.getValueWithContext(region); + boolean lruFaultedIn = false; + boolean done = false; + try { + //Asif: If the entry is instance of LRU then DidkRegion cannot be null. + //Since SqlFabric is accessing this method direcly & it passes the owning region, + //if the region happens to be persistent PR type, the owning region passed is PR, + // but it will have DiskRegion as null. SqlFabric takes care of passing owning region + // as BucketRegion in case of Overflow type entry. This is fix for Bug # 41804 + if ( entry instanceof LRUEntry && !dr.isSync() ) { + synchronized (entry) { + DiskId did = entry.getDiskId(); + if (did != null && did.isPendingAsync()) { + done = true; + // See if it is pending async because of a faultOut. + // If so then if we are not a backup then we can unschedule the pending async. + // In either case we need to do the lruFaultIn logic. + boolean evicted = ((LRUEntry)entry).testEvicted(); + if (evicted) { + if (!dr.isBackup()) { + // @todo do we also need a bit that tells us if it is in the async queue? + // Seems like we could end up adding it to the queue multiple times. + did.setPendingAsync(false); + } + // since it was evicted fix the stats here + dr.incNumEntriesInVM(1L); + dr.incNumOverflowOnDisk(-1L); + // no need to dec overflowBytesOnDisk because it was not inced in this case. + incrementBucketStats(region, 1/*InVM*/, -1/*OnDisk*/, 0); + } + lruEntryFaultIn((LRUEntry) entry, region); + lruFaultedIn = true; + } + } + } + if (!done + && (v == null || Token.isRemovedFromDisk(v) && !region.isIndexCreationThread())) { + synchronized (entry) { + v = entry._getValueRetain(region, true); // TODO:KIRK:OK v = entry.getValueWithContext(region); + if (v == null) { + v = readValueFromDisk(entry, region); + if (entry instanceof LRUEntry) { + if (v != null && !Token.isInvalid(v)) { + lruEntryFaultIn((LRUEntry) entry, region); + + lruFaultedIn = true; + } + } + } + } + } + } finally { + if (!retainResult) { + v = OffHeapHelper.copyAndReleaseIfNeeded(v); + // At this point v should be either a heap object + } + } + if (Token.isRemoved(v)) { + // fix for bug 31800 + v = null; + } else { + ((RegionEntry)entry).setRecentlyUsed(); + } + if (lruFaultedIn) { + lruUpdateCallback(region); + } + return v; // OFFHEAP: the value ends up being returned by RegionEntry.getValue + } + + public static void recoverValue(DiskEntry entry, long oplogId, DiskRecoveryStore recoveryStore, ByteArrayDataInput in) { + boolean lruFaultedIn = false; + synchronized (entry) { + if (entry.isValueNull()) { + DiskId did = entry.getDiskId(); + if (did != null) { + Object value = null; + DiskRecoveryStore region = recoveryStore; + DiskRegionView dr = region.getDiskRegionView(); + dr.acquireReadLock(); + try { + synchronized (did) { + // don't read if the oplog has changed. + if (oplogId == did.getOplogId()) { + value = getValueFromDisk(dr, did, in); + if (value != null) { + setValueOnFaultIn(value, did, entry, dr, region); + } + } + } + } finally { + dr.releaseReadLock(); + } + if (entry instanceof LRUEntry) { + if (value != null && !Token.isInvalid(value)) { + lruEntryFaultIn((LRUEntry) entry, recoveryStore); + lruFaultedIn = true; + } + } + } + } + } + if (lruFaultedIn) { + lruUpdateCallback(recoveryStore); + } + } + + /** + * Caller must have "did" synced. + */ + private static Object getValueFromDisk(DiskRegionView dr, DiskId did, ByteArrayDataInput in) { + Object value; + if (dr.isBackup() && did.getKeyId() == DiskRegion.INVALID_ID) { + // must have been destroyed + value = null; + } else { + if (did.isKeyIdNegative()) { + did.setKeyId(- did.getKeyId()); + } + // if a bucket region then create a CachedDeserializable here instead of object + value = dr.getRaw(did); // fix bug 40192 + if (value instanceof BytesAndBits) { + BytesAndBits bb = (BytesAndBits)value; + if (EntryBits.isInvalid(bb.getBits())) { + value = Token.INVALID; + } else if (EntryBits.isLocalInvalid(bb.getBits())) { + value = Token.LOCAL_INVALID; + } else if (EntryBits.isTombstone(bb.getBits())) { + value = Token.TOMBSTONE; + } else if (EntryBits.isSerialized(bb.getBits())) { + value = readSerializedValue(bb.getBytes(), bb.getVersion(), in, false); + } else { + value = readRawValue(bb.getBytes(), bb.getVersion(), in); + } + } + } + return value; + } + + private static void lruUpdateCallback(DiskRecoveryStore recoveryStore) { + /* + * Used conditional check to see if + * if its a LIFO Enabled, + * yes then disable lruUpdateCallback() + * and called updateStats() + * its keep track of actual entries + * present in memory - useful when + * checking capacity constraint + */ + try { + if (recoveryStore.getEvictionAttributes() != null + && recoveryStore.getEvictionAttributes().getAlgorithm().isLIFO()) { + ((VMLRURegionMap) recoveryStore.getRegionMap()).updateStats(); + return; + } + // this must be done after releasing synchronization + recoveryStore.getRegionMap().lruUpdateCallback(); + }catch( DiskAccessException dae) { + recoveryStore.handleDiskAccessException(dae); + throw dae; + } + } + + private static void lruEntryFaultIn(LRUEntry entry, DiskRecoveryStore recoveryStore) { + RegionMap rm = (RegionMap)recoveryStore.getRegionMap(); + try { + rm.lruEntryFaultIn((LRUEntry) entry); + }catch(DiskAccessException dae) { + recoveryStore.handleDiskAccessException(dae); + throw dae; + } + } + + /** + * Returns the value of this map entry, reading it from disk, if necessary. + * Sets the value in the entry. + * This is only called by the faultIn code once it has determined that + * the value is no longer in memory. + * return the result will only be off-heap if the value is a sqlf ByteSource. Otherwise result will be on-heap. + * Caller must have "entry" synced. + */ + @Retained + private static Object readValueFromDisk(DiskEntry entry, DiskRecoveryStore region) { + + DiskRegionView dr = region.getDiskRegionView(); + DiskId did = entry.getDiskId(); + if (did == null) { + return null; + } + dr.acquireReadLock(); + try { + synchronized (did) { + Object value = getValueFromDisk(dr, did, null); + if (value == null) return null; + @Unretained Object preparedValue = setValueOnFaultIn(value, did, entry, dr, region); + // For Sqlfire we want to return the offheap representation. + // So we need to retain it for the caller to release. + /*if (preparedValue instanceof ByteSource) { + // This is the only case in which we return a retained off-heap ref. + ((ByteSource)preparedValue).retain(); + return preparedValue; + } else */{ + return value; + } + } + } finally { + dr.releaseReadLock(); + } + } + + /** + * Caller must have "entry" and "did" synced and "dr" readLocked. + * @return the unretained result must be used by the caller before it releases the sync on "entry". + */ + @Unretained + private static Object setValueOnFaultIn(Object value, DiskId did, DiskEntry entry, DiskRegionView dr, DiskRecoveryStore region) { + // dr.getOwner().getCache().getLogger().info("DEBUG: faulting in entry with key " + entry.getKey()); + int bytesOnDisk = getValueLength(did); + // Retained by the prepareValueForCache call for the region entry. + // NOTE that we return this value unretained because the retain is owned by the region entry not the caller. + @Retained Object preparedValue = entry.prepareValueForCache((RegionEntryContext) region, value, + false); + region.updateSizeOnFaultIn(entry.getKey(), region.calculateValueSize(preparedValue), bytesOnDisk); + //did.setValueSerializedSize(0); + // I think the following assertion is true but need to run + // a regression with it. Reenable this post 6.5 + //Assert.assertTrue(entry._getValue() == null); + entry.setValueWithContext((RegionEntryContext) region, preparedValue); + dr.incNumEntriesInVM(1L); + dr.incNumOverflowOnDisk(-1L); + dr.incNumOverflowBytesOnDisk(-bytesOnDisk); + incrementBucketStats(region, 1/*InVM*/, -1/*OnDisk*/, -bytesOnDisk); + return preparedValue; + } + + static Object readSerializedValue(byte[] valueBytes, Version version, + ByteArrayDataInput in, boolean forceDeserialize) { + if (forceDeserialize) { + // deserialize checking for product version change + return EntryEventImpl.deserialize(valueBytes, version, in); + } + else { + // TODO: upgrades: is there a case where GemFire values are internal + // ones that need to be upgraded transparently; probably messages + // being persisted (gateway events?) + return CachedDeserializableFactory.create(valueBytes); + } + } + + static Object readRawValue(byte[] valueBytes, Version version, + ByteArrayDataInput in) { + /* + final StaticSystemCallbacks sysCb; + if (version != null && (sysCb = GemFireCacheImpl.FactoryStatics + .systemCallbacks) != null) { + // may need to change serialized shape for SQLFire + return sysCb.fromVersion(valueBytes, false, version, in); + } + else */ { + return valueBytes; + } + } + + public static void incrementBucketStats(Object owner, + int entriesInVmDelta, + int overflowOnDiskDelta, + int overflowBytesOnDiskDelta) { + if (owner instanceof BucketRegion) { + ((BucketRegion)owner).incNumEntriesInVM(entriesInVmDelta); + ((BucketRegion)owner).incNumOverflowOnDisk(overflowOnDiskDelta); + ((BucketRegion)owner).incNumOverflowBytesOnDisk(overflowBytesOnDiskDelta); + } else if (owner instanceof DiskRegionView) { + ((DiskRegionView)owner).incNumOverflowBytesOnDisk(overflowBytesOnDiskDelta); + } + } + + /** + * Writes the value of this DiskEntry to disk and + * null s out the reference to the value to free up VM space. + *

+ * Note that if the value had already been written to disk, it is not + * written again. + *

+ * Caller must synchronize on entry and it is assumed the entry is evicted + * + * see #writeToDisk + * @throws RegionClearedException + */ + public static int overflowToDisk(DiskEntry entry, LocalRegion region, EnableLRU ccHelper) throws RegionClearedException { + { + Token entryVal = entry.getValueAsToken(); + if (entryVal == null || Token.isRemovedFromDisk(entryVal)) { + // Note it could be removed token now because + // freeAllEntriesOnDisk is not able to sync on entry + return 0; + } + } + DiskRegion dr = region.getDiskRegion(); + final int oldSize = region.calculateRegionEntryValueSize(entry);; + //Asif:Get diskID . If it is null, it implies it is + // overflow only mode. + //long id = entry.getDiskId().getKeyId(); + DiskId did = entry.getDiskId(); + if (did == null) { + ((LRUEntry)entry).setDelayedDiskId(region); + did = entry.getDiskId(); + } + + // Notify the SQLFire IndexManager if present + /* final IndexUpdater indexUpdater = region.getIndexUpdater(); + if(indexUpdater != null && dr.isSync()) { + indexUpdater.onOverflowToDisk(entry); + }*/ + + int change = 0; + boolean scheduledAsyncHere = false; + dr.acquireReadLock(); + try { + synchronized (did) { + // check for a concurrent freeAllEntriesOnDisk + if (entry.isRemovedFromDisk()) { + return 0; + } + + //TODO:Asif: Check if we need to overflow even when id is = 0 + boolean wasAlreadyPendingAsync = did.isPendingAsync(); + if (did.needsToBeWritten()) { + if (dr.isSync()) { + writeToDisk(entry, region, false); + } else if (!wasAlreadyPendingAsync) { + scheduledAsyncHere = true; + did.setPendingAsync(true); + } else { + // it may have been scheduled to be written (isBackup==true) + // and now we are faulting it out + } + } + + boolean movedValueToDisk = false; // added for bug 41849 + + // If async then if it does not need to be written (because it already was) + // then treat it like the sync case. This fixes bug 41310 + if (scheduledAsyncHere || wasAlreadyPendingAsync) { + // we call _setValue(null) after it is actually written to disk + change = entry.updateAsyncEntrySize(ccHelper); + // do the stats when it is actually written to disk + } else { + region.updateSizeOnEvict(entry.getKey(), oldSize); + //did.setValueSerializedSize(byteSizeOnDisk); + try { + entry.handleValueOverflow(region); + entry.setValueWithContext(region,null); + }finally { + entry.afterValueOverflow(region); + } + movedValueToDisk = true; + change = ((LRUClockNode)entry).updateEntrySize(ccHelper); + } + int valueLength = 0; + if (movedValueToDisk) { + valueLength = getValueLength(did); + } + dr.incNumEntriesInVM(-1L); + dr.incNumOverflowOnDisk(1L); + dr.incNumOverflowBytesOnDisk(valueLength); + incrementBucketStats(region, -1/*InVM*/, 1/*OnDisk*/, valueLength); + } + } finally { + dr.releaseReadLock(); + } + if (scheduledAsyncHere && did.isPendingAsync()) { + // this needs to be done outside the above sync + // the version tag is null here because this method only needs + // to write to disk for overflow only regions, which do not need + // to maintain an RVV on disk. + scheduleAsyncWrite(new AsyncDiskEntry(region, entry, null)); + } + return change; + } + + private static void scheduleAsyncWrite(AsyncDiskEntry ade) { + DiskRegion dr = ade.region.getDiskRegion(); + dr.scheduleAsyncWrite(ade); + } + + + public static void handleFullAsyncQueue(DiskEntry entry, LocalRegion region, VersionTag tag) { + DiskRegion dr = region.getDiskRegion(); + DiskId did = entry.getDiskId(); + synchronized (entry) { + dr.acquireReadLock(); + try { + synchronized (did) { + if (did.isPendingAsync()) { + did.setPendingAsync(false); + final Token entryVal = entry.getValueAsToken(); + final int entryValSize = region.calculateRegionEntryValueSize(entry); + boolean remove = false; + try { + if (Token.isRemovedFromDisk(entryVal)) { + // onDisk was already deced so just do the valueLength here + dr.incNumOverflowBytesOnDisk(-did.getValueLength()); + incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/, + -did.getValueLength()); + dr.remove(region, entry, true, false); + if (dr.isBackup()) { + did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340 + } + remove = true; + } else if (Token.isInvalid(entryVal) && !dr.isBackup()) { + // no need to write invalid to disk if overflow only + } else if (entryVal != null) { + writeToDisk(entry, region, true); + } else { + //if we have a version tag we need to record the operation + //to update the RVV + if(tag != null) { + DiskEntry.Helper.doAsyncFlush(tag, region); + } + return; + } + assert !dr.isSync(); + // Only setValue to null if this was an evict. + // We could just be a backup that is writing async. + if (!remove + && !Token.isInvalid(entryVal) + && entry instanceof LRUEntry + && ((LRUEntry)entry).testEvicted()) { + // Moved this here to fix bug 40116. + region.updateSizeOnEvict(entry.getKey(), entryValSize); + // note the old size was already accounted for + // onDisk was already inced so just do the valueLength here + dr.incNumOverflowBytesOnDisk(did.getValueLength()); + incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/, + did.getValueLength()); + try { + entry.handleValueOverflow(region); + entry.setValueWithContext(region,null); + }finally { + entry.afterValueOverflow(region); + } + } + + //See if we the entry we wrote to disk has the same tag + //as this entry. If not, write the tag as a conflicting operation. + //to update the RVV. + VersionStamp stamp = entry.getVersionStamp(); + if(tag != null && stamp != null + && (stamp.getMemberID() != tag.getMemberID() + || stamp.getRegionVersion() != tag.getRegionVersion())) { + DiskEntry.Helper.doAsyncFlush(tag, region); + } + } catch (RegionClearedException ignore) { + // no need to do the op since it was clobbered by a region clear + } + } else { + //if we have a version tag we need to record the operation + //to update the RVV, even if we don't write the entry + if(tag != null) { + DiskEntry.Helper.doAsyncFlush(tag, region); + } + } + } + } finally { + dr.releaseReadLock(); + } + } // sync entry + } + + public static void doAsyncFlush(VersionTag tag, LocalRegion region) { + if (region.isThisRegionBeingClosedOrDestroyed()) return; + DiskRegion dr = region.getDiskRegion(); + if (!dr.isBackup()) { + return; + } + assert !dr.isSync(); + dr.acquireReadLock(); + try { + dr.getDiskStore().putVersionTagOnly(region, tag, true); + } finally { + dr.releaseReadLock(); + } + } + + /** + * Flush an entry that was previously scheduled to be written to disk. + * @param tag + * @since prPersistSprint1 + */ + public static void doAsyncFlush(DiskEntry entry, LocalRegion region, VersionTag tag) { + if (region.isThisRegionBeingClosedOrDestroyed()) return; + DiskRegion dr = region.getDiskRegion(); + dr.setClearCountReference(); + synchronized (entry) { // fixes 40116 + // If I don't sync the entry and this method ends up doing an eviction + // thus setting value to null + // some other thread is free to fetch the value while the entry is synced + // and think it has removed it or replaced it. This results in updateSizeOn* + // being called twice for the same value (once when it is evicted and once + // when it is removed/updated). + try { + dr.acquireReadLock(); + try { + DiskId did = entry.getDiskId(); + synchronized (did) { + if (did.isPendingAsync()) { + did.setPendingAsync(false); + final Token entryVal = entry.getValueAsToken(); + final int entryValSize = region.calculateRegionEntryValueSize(entry); + boolean remove = false; + try { + if (Token.isRemovedFromDisk(entryVal)) { + if (region.isThisRegionBeingClosedOrDestroyed()) return; + // onDisk was already deced so just do the valueLength here + dr.incNumOverflowBytesOnDisk(-did.getValueLength()); + incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/, + -did.getValueLength()); + dr.remove(region, entry, true, false); + if (dr.isBackup()) { + did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340 + } + remove = true; + } else if ((Token.isInvalid(entryVal) || entryVal == Token.TOMBSTONE) && !dr.isBackup()) { + // no need to write invalid or tombstones to disk if overflow only + } else if (entryVal != null) { + writeToDisk(entry, region, true); + } else { + // @todo why would we have a null value here? + // I'm seeing it show up in tests: + // java.lang.IllegalArgumentException: Must not serialize null in this context. + // at com.gemstone.gemfire.internal.cache.EntryEventImpl.serialize(EntryEventImpl.java:1024) + // at com.gemstone.gemfire.internal.cache.DiskEntry$Helper.writeToDisk(DiskEntry.java:351) + // at com.gemstone.gemfire.internal.cache.DiskEntry$Helper.doAsyncFlush(DiskEntry.java:683) + // at com.gemstone.gemfire.internal.cache.DiskRegion$FlusherThread.run(DiskRegion.java:1055) + //if we have a version tag we need to record the operation + //to update the RVV + if(tag != null) { + DiskEntry.Helper.doAsyncFlush(tag, region); + } + return; + } + assert !dr.isSync(); + // Only setValue to null if this was an evict. + // We could just be a backup that is writing async. + if (!remove + && !Token.isInvalid(entryVal) + && (entryVal != Token.TOMBSTONE) + && entry instanceof LRUEntry + && ((LRUEntry)entry).testEvicted()) { + // Moved this here to fix bug 40116. + region.updateSizeOnEvict(entry.getKey(), entryValSize); + // note the old size was already accounted for + // onDisk was already inced so just do the valueLength here + dr.incNumOverflowBytesOnDisk(did.getValueLength()); + incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/, + did.getValueLength()); + try { + entry.handleValueOverflow(region); + entry.setValueWithContext(region,null); + }finally { + entry.afterValueOverflow(region); + } + } + } catch (RegionClearedException ignore) { + // no need to do the op since it was clobbered by a region clear + } + + //See if we the entry we wrote to disk has the same tag + //as this entry. If not, write the tag as a conflicting operation. + //to update the RVV. + VersionStamp stamp = entry.getVersionStamp(); + if(tag != null && stamp != null + && (stamp.getMemberID() != tag.getMemberID() + || stamp.getRegionVersion() != tag.getRegionVersion())) { + DiskEntry.Helper.doAsyncFlush(tag, region); + } + } else { + //if we have a version tag we need to record the operation + //to update the RVV + if(tag != null) { + DiskEntry.Helper.doAsyncFlush(tag, region); + } + } + } + } finally { + dr.releaseReadLock(); + } + } finally { + dr.removeClearCountReference(); + } + } // sync entry + } + + /** + * Removes the key/value pair in the given entry from disk + * + * @throws RegionClearedException If the operation is aborted due to a clear + * @see DiskRegion#remove + */ + public static void removeFromDisk(DiskEntry entry, LocalRegion region, boolean isClear) throws RegionClearedException { + removeFromDisk(entry, region, true, false, isClear); + } + private static void removeFromDisk(DiskEntry entry, LocalRegion region, + boolean checkValue, boolean valueWasNull, boolean isClear) throws RegionClearedException { + DiskRegion dr = region.getDiskRegion(); + + //If we have concurrency checks enabled for a persistent region, we need + //to add an entry to the async queue for every update to maintain the RVV + boolean maintainRVV = region.concurrencyChecksEnabled && dr.isBackup(); + + DiskId did = entry.getDiskId(); + VersionTag tag = null; + Object syncObj = did; + if (did == null) { + syncObj = entry; + } + boolean scheduledAsyncHere = false; + if (syncObj == did) { + dr.acquireReadLock(); + } + try { + synchronized (syncObj) { + + if (did == null || (dr.isBackup() && did.getKeyId()== DiskRegion.INVALID_ID)) { + // Not on disk yet + dr.incNumEntriesInVM(-1L); + incrementBucketStats(region, -1/*InVM*/, 0/*OnDisk*/, 0); + dr.unscheduleAsyncWrite(did); + return; + } + //Asif: This will convert the -ve OplogKeyId to positive as part of fixing + //Bug # 39989 + did.unmarkForWriting(); + + //System.out.println("DEBUG: removeFromDisk doing remove(" + id + ")"); + int oldValueLength = 0; + if (dr.isSync() || isClear) { + oldValueLength = did.getValueLength(); + dr.remove(region, entry, false, isClear); + if (dr.isBackup()) { + did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340 + } + //If this is a clear, we should unschedule the async write for this + //entry + did.setPendingAsync(false); + } else { + if (!did.isPendingAsync() || maintainRVV) { + scheduledAsyncHere = true; + did.setPendingAsync(true); + VersionStamp stamp = entry.getVersionStamp(); + if(stamp != null) { + tag = stamp.asVersionTag(); + } + } + } + if (checkValue) { + valueWasNull = entry.isValueNull(); + entry._removePhase1(); + } + if (valueWasNull) { + dr.incNumOverflowOnDisk(-1L); + dr.incNumOverflowBytesOnDisk(-oldValueLength); + incrementBucketStats(region, 0/*InVM*/, -1/*OnDisk*/, -oldValueLength); + } + else { + dr.incNumEntriesInVM(-1L); + incrementBucketStats(region, -1/*InVM*/, 0/*OnDisk*/, 0); + if (!dr.isSync()) { + // we are going to do an async remove of an entry that is not currently + // overflowed to disk so we don't want to count its value length as being + // on disk when we finally do the async op. So we clear it here. + did.setValueLength(0); + } + } + } + } finally { + if (syncObj == did) { + dr.releaseReadLock(); + } + } + if (scheduledAsyncHere && did.isPendingAsync()) { + // do this outside the sync + scheduleAsyncWrite(new AsyncDiskEntry(region, entry, tag)); + } + } + + /** + * @param entry + * @param region + * @param tag + */ + public static void updateVersionOnly(DiskEntry entry, LocalRegion region, + VersionTag tag) { + DiskRegion dr = region.getDiskRegion(); + if (!dr.isBackup()) { + return; + } + + assert tag != null && tag.getMemberID()!=null; + boolean scheduleAsync = false; + DiskId did = entry.getDiskId(); + Object syncObj = did; + if (syncObj == null) { + syncObj = entry; + } + if (syncObj == did) { + dr.acquireReadLock(); + } + try { + synchronized (syncObj) { + if (dr.isSync()) { + dr.getDiskStore().putVersionTagOnly(region, tag, false); + } else { + scheduleAsync = true; + } + } + } finally { + if (syncObj == did) { + dr.releaseReadLock(); + } + } + if (scheduleAsync) { + // this needs to be done outside the above sync + scheduleAsyncWrite(new AsyncDiskEntry(region, tag)); + } + } + + } + + /** + * A marker object for an entry that has been recovered from disk. + * It is handled specially when it is placed in a region. + */ + public static class RecoveredEntry { + + /** The disk id of the entry being recovered */ + private final long recoveredKeyId; + + /** The value of the recovered entry */ + private final Object value; + + private final long offsetInOplog; + private final byte userBits; + private final int valueLength; + + private long oplogId; + private VersionTag tag; + + /** + * Only for this constructor, the value is not loaded into the region & it is lying + * on the oplogs. Since Oplogs rely on DiskId to furnish user bits so as to correctly + * interpret bytes, the userbit needs to be set correctly here. + */ + public RecoveredEntry(long keyId, long oplogId, long offsetInOplog, + byte userBits, int valueLength) { + this(-keyId, oplogId, offsetInOplog, userBits, valueLength, null); + } + + public RecoveredEntry(long keyId, long oplogId, long offsetInOplog, + byte userBits, int valueLength, Object value) { + this.recoveredKeyId = keyId; + this.value = value; + this.oplogId = oplogId; + this.offsetInOplog = offsetInOplog; + this.userBits = EntryBits.setRecoveredFromDisk(userBits, true); + this.valueLength = valueLength; + } + + /** + * Returns the disk id of the entry being recovered + */ + public long getRecoveredKeyId() { + return this.recoveredKeyId; + } + /** + * Returns the value of the recovered entry. Note that if the + * disk id is < 0 then the value has not been faulted in and + * this method will return null. + */ + public Object getValue() { + return this.value; + } + /** + * + * @return byte indicating the user bits. The correct value is returned only in the specific case of + * entry recovered from oplog ( & not rolled to Htree) & the RECOVER_VALUES flag is false . In other cases + * the exact value is not needed + */ + public byte getUserBits() { + return this.userBits; + } + public int getValueLength() { + return this.valueLength; + } + public long getOffsetInOplog() { + return offsetInOplog; + } + public long getOplogId() { + return this.oplogId; + } + + public void setOplogId(long v) { + this.oplogId = v; + } + public VersionTag getVersionTag() { + return this.tag; + } + public void setVersionTag(VersionTag tag) { + this.tag = tag; + } + } + }