Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 43C2A19EC1 for ; Wed, 27 Apr 2016 21:16:55 +0000 (UTC) Received: (qmail 60741 invoked by uid 500); 27 Apr 2016 21:16:55 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 60712 invoked by uid 500); 27 Apr 2016 21:16:55 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 60702 invoked by uid 99); 27 Apr 2016 21:16:55 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Apr 2016 21:16:55 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 89E7B1A130D for ; Wed, 27 Apr 2016 21:16:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id OeX2fL51evPo for ; Wed, 27 Apr 2016 21:16:39 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 57FC05F5D3 for ; Wed, 27 Apr 2016 21:16:35 +0000 (UTC) Received: (qmail 58165 invoked by uid 99); 27 Apr 2016 21:16:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Apr 2016 21:16:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 40072E0593; Wed, 27 Apr 2016 21:16:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jinmeiliao@apache.org To: commits@geode.incubator.apache.org Date: Wed, 27 Apr 2016 21:16:42 -0000 Message-Id: <4ea286d1c28a43c992a3e69765363965@git.apache.org> In-Reply-To: <3834ea97aa804fcab3c9d8446bb91fa6@git.apache.org> References: <3834ea97aa804fcab3c9d8446bb91fa6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/22] incubator-geode git commit: GEODE-1072: Removing HDFS related code http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java index 74efd51..c5b5d3a 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java @@ -96,8 +96,6 @@ public interface GatewaySender { public static final int DEFAULT_DISPATCHER_THREADS = 5; - public static final int DEFAULT_HDFS_DISPATCHER_THREADS = 5; - public static final OrderPolicy DEFAULT_ORDER_POLICY = OrderPolicy.KEY; /** * The default maximum amount of memory (MB) to allow in the queue before http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java index 77f24a3..bd78f5a 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java @@ -52,7 +52,6 @@ import com.gemstone.gemfire.cache.client.internal.locator.LocatorStatusRequest; import com.gemstone.gemfire.cache.client.internal.locator.LocatorStatusResponse; import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionRequest; import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionResponse; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl; import com.gemstone.gemfire.cache.query.QueryService; import com.gemstone.gemfire.cache.query.internal.CqEntry; import com.gemstone.gemfire.cache.query.internal.CumulativeNonDistinctResults; @@ -1023,8 +1022,6 @@ public final class DSFIDFactory implements DataSerializableFixedID { RemoteFetchVersionMessage.FetchVersionReplyMessage.class); registerDSFID(RELEASE_CLEAR_LOCK_MESSAGE, ReleaseClearLockMessage.class); registerDSFID(PR_TOMBSTONE_MESSAGE, PRTombstoneMessage.class); - registerDSFID(HDFS_GATEWAY_EVENT_IMPL, HDFSGatewayEventImpl.class); - registerDSFID(REQUEST_RVV_MESSAGE, InitialImageOperation.RequestRVVMessage.class); registerDSFID(RVV_REPLY_MESSAGE, InitialImageOperation.RVVReplyMessage.class); registerDSFID(SNAPPY_COMPRESSED_CACHED_DESERIALIZABLE, SnappyCompressedCachedDeserializable.class); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java index 5d52346..7427f90 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java @@ -103,7 +103,6 @@ public interface DataSerializableFixedID extends SerializationVersions { public static final short JOIN_RESPONSE = -143; public static final short JOIN_REQUEST = -142; - public static final short HDFS_GATEWAY_EVENT_IMPL = -141; public static final short SNAPPY_COMPRESSED_CACHED_DESERIALIZABLE = -140; public static final short GATEWAY_EVENT_IMPL = -136; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java index 4d4197e..f8740db 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java @@ -33,7 +33,6 @@ import com.gemstone.gemfire.cache.CacheLoader; import com.gemstone.gemfire.cache.CacheLoaderException; import com.gemstone.gemfire.cache.CacheWriter; import com.gemstone.gemfire.cache.CacheWriterException; -import com.gemstone.gemfire.cache.CustomEvictionAttributes; import com.gemstone.gemfire.cache.CustomExpiry; import com.gemstone.gemfire.cache.DataPolicy; import com.gemstone.gemfire.cache.Declarable; @@ -50,10 +49,7 @@ import com.gemstone.gemfire.cache.RegionAttributes; import com.gemstone.gemfire.cache.RegionEvent; import com.gemstone.gemfire.cache.Scope; import com.gemstone.gemfire.cache.SubscriptionAttributes; -import com.gemstone.gemfire.compression.CompressionException; import com.gemstone.gemfire.compression.Compressor; -import com.gemstone.gemfire.internal.InternalDataSerializer; -import com.gemstone.gemfire.internal.Version; import com.gemstone.gemfire.internal.cache.EvictionAttributesImpl; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; @@ -108,8 +104,6 @@ public class RemoteRegionAttributes implements RegionAttributes, private String[] gatewaySendersDescs; private boolean isGatewaySenderEnabled = false; private String[] asyncEventQueueDescs; - private String hdfsStoreName; - private boolean hdfsWriteOnly; private String compressorDesc; private boolean offHeap; @@ -161,8 +155,6 @@ public class RemoteRegionAttributes implements RegionAttributes, this.isDiskSynchronous = attr.isDiskSynchronous(); this.gatewaySendersDescs = getDescs(attr.getGatewaySenderIds().toArray()); this.asyncEventQueueDescs = getDescs(attr.getAsyncEventQueueIds().toArray()); - this.hdfsStoreName = attr.getHDFSStoreName(); - this.hdfsWriteOnly = attr.getHDFSWriteOnly(); this.compressorDesc = getDesc(attr.getCompressor()); this.offHeap = attr.getOffHeap(); } @@ -419,7 +411,6 @@ public class RemoteRegionAttributes implements RegionAttributes, DataSerializer.writeString(this.compressorDesc, out); out.writeBoolean(this.offHeap); - DataSerializer.writeString(this.hdfsStoreName, out); } public void fromData(DataInput in) throws IOException, ClassNotFoundException { @@ -468,7 +459,6 @@ public class RemoteRegionAttributes implements RegionAttributes, this.compressorDesc = DataSerializer.readString(in); this.offHeap = in.readBoolean(); - this.hdfsStoreName = DataSerializer.readString(in); } private String[] getDescs(Object[] l) { @@ -636,15 +626,6 @@ public class RemoteRegionAttributes implements RegionAttributes, return this.evictionAttributes; } - /** - * {@inheritDoc} - */ - @Override - public CustomEvictionAttributes getCustomEvictionAttributes() { - // TODO: HDFS: no support for custom eviction attributes from remote yet - return null; - } - public boolean getCloningEnabled() { // TODO Auto-generated method stub return this.cloningEnable; @@ -653,12 +634,6 @@ public class RemoteRegionAttributes implements RegionAttributes, public String getDiskStoreName() { return this.diskStoreName; } - public String getHDFSStoreName() { - return this.hdfsStoreName; - } - public boolean getHDFSWriteOnly() { - return this.hdfsWriteOnly; - } public boolean isDiskSynchronous() { return this.isDiskSynchronous; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java index 1f8da88..92eaa01 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java @@ -34,8 +34,6 @@ import com.gemstone.gemfire.cache.Operation; import com.gemstone.gemfire.cache.RegionAttributes; import com.gemstone.gemfire.cache.RegionDestroyedException; import com.gemstone.gemfire.cache.TimeoutException; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl; import com.gemstone.gemfire.internal.cache.lru.LRUStatistics; import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector; import com.gemstone.gemfire.internal.cache.versions.VersionSource; @@ -459,17 +457,8 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { } waitIfQueueFull(); - int sizeOfHdfsEvent = -1; try { - if (this instanceof HDFSBucketRegionQueue) { - // need to fetch the size before event is inserted in queue. - // fix for #50016 - if (this.getBucketAdvisor().isPrimary()) { - HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)event.getValue(); - sizeOfHdfsEvent = hdfsEvent.getSizeOnHDFSInBytes(!((HDFSBucketRegionQueue)this).isBucketSorted); - } - } - + didPut = virtualPut(event, false, false, null, false, startPut, true); checkReadiness(); @@ -492,7 +481,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { destroyKey(key); didPut = false; } else { - addToEventQueue(key, didPut, event, sizeOfHdfsEvent); + addToEventQueue(key, didPut, event); } return didPut; } @@ -522,8 +511,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { } protected abstract void clearQueues(); - protected abstract void addToEventQueue(Object key, boolean didPut, EntryEventImpl event, - int sizeOfHdfsEvent); + protected abstract void addToEventQueue(Object key, boolean didPut, EntryEventImpl event); @Override public void afterAcquiringPrimaryState() { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java index 10644cb..d37f025 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.logging.log4j.Logger; @@ -46,7 +45,6 @@ import com.gemstone.gemfire.cache.CacheLoaderException; import com.gemstone.gemfire.cache.CacheStatistics; import com.gemstone.gemfire.cache.CacheWriter; import com.gemstone.gemfire.cache.CacheWriterException; -import com.gemstone.gemfire.cache.CustomEvictionAttributes; import com.gemstone.gemfire.cache.CustomExpiry; import com.gemstone.gemfire.cache.DataPolicy; import com.gemstone.gemfire.cache.DiskWriteAttributes; @@ -54,7 +52,6 @@ import com.gemstone.gemfire.cache.EntryExistsException; import com.gemstone.gemfire.cache.EntryNotFoundException; import com.gemstone.gemfire.cache.EvictionAttributes; import com.gemstone.gemfire.cache.EvictionAttributesMutator; -import com.gemstone.gemfire.cache.EvictionCriteria; import com.gemstone.gemfire.cache.ExpirationAction; import com.gemstone.gemfire.cache.ExpirationAttributes; import com.gemstone.gemfire.cache.MembershipAttributes; @@ -100,7 +97,6 @@ import com.gemstone.gemfire.internal.logging.LogService; import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; import com.gemstone.gemfire.internal.util.ArrayUtils; import com.gemstone.gemfire.pdx.internal.PeerTypeRegistration; -import com.google.common.util.concurrent.Service.State; /** * Takes care of RegionAttributes, AttributesMutator, and some no-brainer method @@ -236,8 +232,6 @@ public abstract class AbstractRegion implements Region, RegionAttributes, protected EvictionAttributesImpl evictionAttributes = new EvictionAttributesImpl(); - protected CustomEvictionAttributes customEvictionAttributes; - /** The membership attributes defining required roles functionality */ protected MembershipAttributes membershipAttributes; @@ -260,10 +254,6 @@ public abstract class AbstractRegion implements Region, RegionAttributes, protected String poolName; - protected String hdfsStoreName; - - protected boolean hdfsWriteOnly; - protected Compressor compressor; /** @@ -898,16 +888,6 @@ public abstract class AbstractRegion implements Region, RegionAttributes, return this.subscriptionAttributes; } - @Override - public final String getHDFSStoreName() { - return this.hdfsStoreName; - } - - @Override - public final boolean getHDFSWriteOnly() { - return this.hdfsWriteOnly; - } - /** * Get IndexManger for region */ @@ -1728,7 +1708,6 @@ public abstract class AbstractRegion implements Region, RegionAttributes, this.setEvictionController(this.evictionAttributes .createEvictionController(this, attrs.getOffHeap())); } - this.customEvictionAttributes = attrs.getCustomEvictionAttributes(); storeCacheListenersField(attrs.getCacheListeners()); assignCacheLoader(attrs.getCacheLoader()); assignCacheWriter(attrs.getCacheWriter()); @@ -1786,8 +1765,6 @@ public abstract class AbstractRegion implements Region, RegionAttributes, + "when multiuser-authentication is true."); } } - this.hdfsStoreName = attrs.getHDFSStoreName(); - this.hdfsWriteOnly = attrs.getHDFSWriteOnly(); this.diskStoreName = attrs.getDiskStoreName(); this.isDiskSynchronous = attrs.isDiskSynchronous(); @@ -1853,52 +1830,12 @@ public abstract class AbstractRegion implements Region, RegionAttributes, return this.evictionAttributes; } - /** - * {@inheritDoc} - */ - @Override - public CustomEvictionAttributes getCustomEvictionAttributes() { - return this.customEvictionAttributes; - } - public EvictionAttributesMutator getEvictionAttributesMutator() { return this.evictionAttributes; } - /** - * {@inheritDoc} - */ - @Override - public CustomEvictionAttributes setCustomEvictionAttributes(long newStart, - long newInterval) { - checkReadiness(); - - if (this.customEvictionAttributes == null) { - throw new IllegalArgumentException( - LocalizedStrings.AbstractRegion_NO_CUSTOM_EVICTION_SET - .toLocalizedString(getFullPath())); - } - - if (newStart == 0) { - newStart = this.customEvictionAttributes.getEvictorStartTime(); - } - this.customEvictionAttributes = new CustomEvictionAttributesImpl( - this.customEvictionAttributes.getCriteria(), newStart, newInterval, - newStart == 0 && newInterval == 0); - -// if (this.evService == null) { -// initilializeCustomEvictor(); -// } else {// we are changing the earlier one which is already started. -// EvictorService service = getEvictorTask(); -// service.changeEvictionInterval(newInterval); -// if (newStart != 0) -// service.changeStartTime(newStart); -// } - return this.customEvictionAttributes; - } - public void setEvictionController(LRUAlgorithm evictionController) { this.evictionController = evictionController; @@ -2037,7 +1974,6 @@ public abstract class AbstractRegion implements Region, RegionAttributes, /** * @since 8.1 - * property used to find region operations that reach out to HDFS multiple times */ @Override public ExtensionPoint> getExtensionPoint() { @@ -2047,87 +1983,4 @@ public abstract class AbstractRegion implements Region, RegionAttributes, public boolean getOffHeap() { return this.offHeap; } - /** - * property used to find region operations that reach out to HDFS multiple times - */ - private static final boolean DEBUG_HDFS_CALLS = Boolean.getBoolean("DebugHDFSCalls"); - - /** - * throws exception if region operation goes out to HDFS multiple times - */ - private static final boolean THROW_ON_MULTIPLE_HDFS_CALLS = Boolean.getBoolean("throwOnMultipleHDFSCalls"); - - private ThreadLocal logHDFSCalls = DEBUG_HDFS_CALLS ? new ThreadLocal() : null; - - public void hdfsCalled(Object key) { - if (!DEBUG_HDFS_CALLS) { - return; - } - logHDFSCalls.get().addStack(new Throwable()); - logHDFSCalls.get().setKey(key); - } - public final void operationStart() { - if (!DEBUG_HDFS_CALLS) { - return; - } - if (logHDFSCalls.get() == null) { - logHDFSCalls.set(new CallLog()); - //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:operationStart", new Throwable()); - } else { - logHDFSCalls.get().incNestedCall(); - //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:incNestedCall:", new Throwable()); - } - } - public final void operationCompleted() { - if (!DEBUG_HDFS_CALLS) { - return; - } - //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:operationCompleted", new Throwable()); - if (logHDFSCalls.get() != null && logHDFSCalls.get().decNestedCall() < 0) { - logHDFSCalls.get().assertCalls(); - logHDFSCalls.set(null); - } - } - - public static class CallLog { - private List stackTraces = new ArrayList(); - private Object key; - private int nestedCall = 0; - public void incNestedCall() { - nestedCall++; - } - public int decNestedCall() { - return --nestedCall; - } - public void addStack(Throwable stack) { - this.stackTraces.add(stack); - } - public void setKey(Object key) { - this.key = key; - } - public void assertCalls() { - if (stackTraces.size() > 1) { - Throwable firstTrace = new Throwable(); - Throwable lastTrace = firstTrace; - for (Throwable t : this.stackTraces) { - lastTrace.initCause(t); - lastTrace = t; - } - if (THROW_ON_MULTIPLE_HDFS_CALLS) { - throw new RuntimeException("SWAP:For key:"+key+" HDFS get called more than once: ", firstTrace); - } else { - InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:For key:"+key+" HDFS get called more than once: ", firstTrace); - } - } - } - } - - public EvictionCriteria getEvictionCriteria() { - EvictionCriteria criteria = null; - if (this.customEvictionAttributes != null - && !this.customEvictionAttributes.isEvictIncoming()) { - criteria = this.customEvictionAttributes.getCriteria(); - } - return criteria; - } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java index b936e3f..46a851d 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java @@ -870,15 +870,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, removeEntry = true; } - // See #47887, we do not insert a tombstone for evicted HDFS - // entries since the value is still present in HDFS - // Check if we have to evict or just do destroy. - boolean forceRemoveEntry = - (event.isEviction() || event.isExpiration()) - && event.getRegion().isUsedForPartitionedRegionBucket() - && event.getRegion().getPartitionedRegion().isHDFSRegion(); - - if (removeEntry || forceRemoveEntry) { + if (removeEntry) { boolean isThisTombstone = isTombstone(); if(inTokenMode && !event.getOperation().isEviction()) { setValue(region, Token.DESTROYED); @@ -1398,27 +1390,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, /** * {@inheritDoc} */ - @Override - public final boolean isMarkedForEviction() { - return areAnyBitsSet(MARKED_FOR_EVICTION); - } - - /** - * {@inheritDoc} - */ - @Override - public final void setMarkedForEviction() { - setBits(MARKED_FOR_EVICTION); - } - /** - * {@inheritDoc} - */ - @Override - public final void clearMarkedForEviction() { - clearBits(~MARKED_FOR_EVICTION); - } - @Override public final synchronized void decRefCount(NewLRUClockHand lruList, LocalRegion lr) { if (TXManagerImpl.decRefCount(this)) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java index 3286373..75a1e32 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java @@ -18,7 +18,6 @@ package com.gemstone.gemfire.internal.cache; -import java.io.IOException; import java.lang.reflect.Method; import java.util.Collection; import java.util.HashSet; @@ -36,7 +35,6 @@ import com.gemstone.gemfire.InvalidDeltaException; import com.gemstone.gemfire.cache.CacheRuntimeException; import com.gemstone.gemfire.cache.CacheWriter; import com.gemstone.gemfire.cache.CacheWriterException; -import com.gemstone.gemfire.cache.CustomEvictionAttributes; import com.gemstone.gemfire.cache.DiskAccessException; import com.gemstone.gemfire.cache.EntryExistsException; import com.gemstone.gemfire.cache.EntryNotFoundException; @@ -83,9 +81,6 @@ import com.gemstone.gemfire.internal.offheap.annotations.Retained; import com.gemstone.gemfire.internal.offheap.annotations.Unretained; import com.gemstone.gemfire.internal.sequencelog.EntryLogger; import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap; -import com.gemstone.gemfire.pdx.PdxInstance; -import com.gemstone.gemfire.pdx.PdxSerializationException; -import com.gemstone.gemfire.pdx.internal.ConvertableToBytes; /** * Abstract implementation of {@link RegionMap}that has all the common @@ -303,10 +298,6 @@ public abstract class AbstractRegionMap implements RegionMap { public RegionEntry getEntry(Object key) { RegionEntry re = (RegionEntry)_getMap().get(key); - if (re != null && re.isMarkedForEviction()) { - // entry has been faulted in from HDFS - return null; - } return re; } @@ -337,16 +328,12 @@ public abstract class AbstractRegionMap implements RegionMap { @Override public final RegionEntry getOperationalEntryInVM(Object key) { RegionEntry re = (RegionEntry)_getMap().get(key); - if (re != null && re.isMarkedForEviction()) { - // entry has been faulted in from HDFS - return null; - } return re; } public final void removeEntry(Object key, RegionEntry re, boolean updateStat) { - if (re.isTombstone() && _getMap().get(key) == re && !re.isMarkedForEviction()){ + if (re.isTombstone() && _getMap().get(key) == re){ logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace")); return; // can't remove tombstones except from the tombstone sweeper } @@ -362,7 +349,7 @@ public abstract class AbstractRegionMap implements RegionMap { EntryEventImpl event, final LocalRegion owner, final IndexUpdater indexUpdater) { boolean success = false; - if (re.isTombstone()&& _getMap().get(key) == re && !re.isMarkedForEviction()) { + if (re.isTombstone()&& _getMap().get(key) == re) { logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace")); return; // can't remove tombstones except from the tombstone sweeper } @@ -371,18 +358,6 @@ public abstract class AbstractRegionMap implements RegionMap { indexUpdater.onEvent(owner, event, re); } - //This is messy, but custom eviction calls removeEntry - //rather than re.destroy I think to avoid firing callbacks, etc. - //However, the value still needs to be set to removePhase1 - //in order to remove the entry from disk. - if(event.isCustomEviction() && !re.isRemoved()) { - try { - re.removePhase1(owner, false); - } catch (RegionClearedException e) { - //that's ok, we were just trying to do evict incoming eviction - } - } - if (_getMap().remove(key, re)) { re.removePhase2(); success = true; @@ -1169,7 +1144,7 @@ public abstract class AbstractRegionMap implements RegionMap { // transaction conflict (caused by eviction) when the entry // is being added to transaction state. if (isEviction) { - if (!confirmEvictionDestroy(oldRe) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) { + if (!confirmEvictionDestroy(oldRe)) { opCompleted = false; return opCompleted; } @@ -1424,7 +1399,7 @@ public abstract class AbstractRegionMap implements RegionMap { // See comment above about eviction checks if (isEviction) { assert expectedOldValue == null; - if (!confirmEvictionDestroy(re) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) { + if (!confirmEvictionDestroy(re)) { opCompleted = false; return opCompleted; } @@ -1506,12 +1481,6 @@ public abstract class AbstractRegionMap implements RegionMap { } } // !isRemoved else { // already removed - if (owner.isHDFSReadWriteRegion() && re.isRemovedPhase2()) { - // For HDFS region there may be a race with eviction - // so retry the operation. fixes bug 49150 - retry = true; - continue; - } if (re.isTombstone() && event.getVersionTag() != null) { // if we're dealing with a tombstone and this is a remote event // (e.g., from cache client update thread) we need to update @@ -2685,11 +2654,7 @@ public abstract class AbstractRegionMap implements RegionMap { boolean onlyExisting, boolean returnTombstone) { Object key = event.getKey(); RegionEntry retVal = null; - if (event.isFetchFromHDFS()) { - retVal = getEntry(event); - } else { - retVal = getEntryInVM(key); - } + retVal = getEntry(event); if (onlyExisting) { if (!returnTombstone && (retVal != null && retVal.isTombstone())) { return null; @@ -2988,47 +2953,6 @@ public abstract class AbstractRegionMap implements RegionMap { else if (re != null && owner.isUsedForPartitionedRegionBucket()) { BucketRegion br = (BucketRegion)owner; CachePerfStats stats = br.getPartitionedRegion().getCachePerfStats(); - long startTime= stats.startCustomEviction(); - CustomEvictionAttributes csAttr = br.getCustomEvictionAttributes(); - // No need to update indexes if entry was faulted in but operation did not succeed. - if (csAttr != null && (csAttr.isEvictIncoming() || re.isMarkedForEviction())) { - - if (csAttr.getCriteria().doEvict(event)) { - stats.incEvictionsInProgress(); - // set the flag on event saying the entry should be evicted - // and not indexed - @Released EntryEventImpl destroyEvent = EntryEventImpl.create (owner, Operation.DESTROY, event.getKey(), - null/* newValue */, null, false, owner.getMyId()); - try { - - destroyEvent.setOldValueFromRegion(); - destroyEvent.setCustomEviction(true); - destroyEvent.setPossibleDuplicate(event.isPossibleDuplicate()); - if(logger.isDebugEnabled()) { - logger.debug("Evicting the entry " + destroyEvent); - } - if(result != null) { - removeEntry(event.getKey(),re, true, destroyEvent,owner, indexUpdater); - } - else{ - removeEntry(event.getKey(),re, true, destroyEvent,owner, null); - } - //mark the region entry for this event as evicted - event.setEvicted(); - stats.incEvictions(); - if(logger.isDebugEnabled()) { - logger.debug("Evicted the entry " + destroyEvent); - } - //removeEntry(event.getKey(), re); - } finally { - destroyEvent.release(); - stats.decEvictionsInProgress(); - } - } else { - re.clearMarkedForEviction(); - } - } - stats.endCustomEviction(startTime); } } // try } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java index 3038059..c241c6b 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java @@ -1316,7 +1316,6 @@ public class BucketAdvisor extends CacheDistributionAdvisor { ((BucketRegion)br).processPendingSecondaryExpires(); } if (br instanceof BucketRegionQueue) { // Shouldn't it be AbstractBucketRegionQueue - // i.e. this stats is not getting incremented for HDFSBucketRegionQueue!! BucketRegionQueue brq = (BucketRegionQueue)br; brq.incQueueSize(brq.size()); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java index 6e4f426..f5ae0fb 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java @@ -26,7 +26,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import org.apache.logging.log4j.Logger; @@ -35,7 +34,6 @@ import com.gemstone.gemfire.CancelException; import com.gemstone.gemfire.CopyHelper; import com.gemstone.gemfire.DataSerializer; import com.gemstone.gemfire.DeltaSerializationException; -import com.gemstone.gemfire.GemFireIOException; import com.gemstone.gemfire.InternalGemFireError; import com.gemstone.gemfire.InvalidDeltaException; import com.gemstone.gemfire.SystemFailure; @@ -43,20 +41,16 @@ import com.gemstone.gemfire.cache.CacheClosedException; import com.gemstone.gemfire.cache.CacheException; import com.gemstone.gemfire.cache.CacheWriter; import com.gemstone.gemfire.cache.CacheWriterException; -import com.gemstone.gemfire.cache.CustomEvictionAttributes; import com.gemstone.gemfire.cache.DiskAccessException; import com.gemstone.gemfire.cache.EntryNotFoundException; import com.gemstone.gemfire.cache.EvictionAction; import com.gemstone.gemfire.cache.EvictionAlgorithm; import com.gemstone.gemfire.cache.EvictionAttributes; -import com.gemstone.gemfire.cache.EvictionCriteria; import com.gemstone.gemfire.cache.ExpirationAction; import com.gemstone.gemfire.cache.Operation; import com.gemstone.gemfire.cache.RegionAttributes; import com.gemstone.gemfire.cache.RegionDestroyedException; import com.gemstone.gemfire.cache.TimeoutException; -import com.gemstone.gemfire.cache.hdfs.HDFSIOException; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer; import com.gemstone.gemfire.cache.partition.PartitionListener; import com.gemstone.gemfire.cache.query.internal.IndexUpdater; import com.gemstone.gemfire.distributed.DistributedMember; @@ -90,13 +84,11 @@ import com.gemstone.gemfire.internal.cache.versions.VersionSource; import com.gemstone.gemfire.internal.cache.versions.VersionStamp; import com.gemstone.gemfire.internal.cache.versions.VersionTag; import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; -import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; import com.gemstone.gemfire.internal.concurrent.Atomics; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; import com.gemstone.gemfire.internal.logging.log4j.LogMarker; -import com.gemstone.gemfire.internal.offheap.StoredObject; import com.gemstone.gemfire.internal.offheap.annotations.Released; import com.gemstone.gemfire.internal.offheap.annotations.Retained; import com.gemstone.gemfire.internal.offheap.annotations.Unretained; @@ -233,8 +225,6 @@ implements Bucket return eventSeqNum; } - protected final AtomicReference hoplog = new AtomicReference(); - public BucketRegion(String regionName, RegionAttributes attrs, LocalRegion parentRegion, GemFireCacheImpl cache, InternalRegionArguments internalRegionArgs) { @@ -892,12 +882,6 @@ implements Bucket beginLocalWrite(event); try { - // increment the tailKey so that invalidate operations are written to HDFS - if (this.partitionedRegion.hdfsStoreName != null) { - /* MergeGemXDHDFSToGFE Disabled this while porting. Is this required? */ - //assert this.partitionedRegion.isLocalParallelWanEnabled(); - handleWANEvent(event); - } // which performs the local op. // The ARM then calls basicInvalidatePart2 with the entry synchronized. if ( !hasSeenEvent(event) ) { @@ -1152,20 +1136,6 @@ implements Bucket if (this.partitionedRegion.isParallelWanEnabled()) { handleWANEvent(event); } - // In GemFire EVICT_DESTROY is not distributed, so in order to remove the entry - // from memory, allow the destroy to proceed. fixes #49784 - if (event.isLoadedFromHDFS() && !getBucketAdvisor().isPrimary()) { - if (logger.isDebugEnabled()) { - logger.debug("Put the destory event in HDFS queue on secondary " - + "and return as event is HDFS loaded " + event); - } - notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event); - return; - }else{ - if (logger.isDebugEnabled()) { - logger.debug("Going ahead with the destroy on GemFire system"); - } - } // This call should invoke AbstractRegionMap (aka ARM) destroy method // which calls the CacheWriter, then performs the local op. // The ARM then calls basicDestroyPart2 with the entry synchronized. @@ -1364,39 +1334,7 @@ implements Bucket } @Override - public boolean isHDFSRegion() { - return this.partitionedRegion.isHDFSRegion(); - } - - @Override - public boolean isHDFSReadWriteRegion() { - return this.partitionedRegion.isHDFSReadWriteRegion(); - } - - @Override - protected boolean isHDFSWriteOnly() { - return this.partitionedRegion.isHDFSWriteOnly(); - } - - @Override public int sizeEstimate() { - if (isHDFSReadWriteRegion()) { - try { - checkForPrimary(); - ConcurrentParallelGatewaySenderQueue q = getHDFSQueue(); - if (q == null) return 0; - int hdfsBucketRegionSize = q.getBucketRegionQueue( - partitionedRegion, getId()).size(); - int hoplogEstimate = (int) getHoplogOrganizer().sizeEstimate(); - if (logger.isDebugEnabled()) { - logger.debug("for bucket " + getName() + " estimateSize returning " - + (hdfsBucketRegionSize + hoplogEstimate)); - } - return hdfsBucketRegionSize + hoplogEstimate; - } catch (ForceReattemptException e) { - throw new PrimaryBucketException(e.getLocalizedMessage(), e); - } - } return size(); } @@ -1453,14 +1391,14 @@ implements Bucket * if there is a serialization problem * see LocalRegion#getDeserializedValue(RegionEntry, KeyInfo, boolean, boolean, boolean, EntryEventImpl, boolean, boolean, boolean) */ - private RawValue getSerialized(Object key, boolean updateStats, boolean doNotLockEntry, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) + private RawValue getSerialized(Object key, + boolean updateStats, + boolean doNotLockEntry, + EntryEventImpl clientEvent, + boolean returnTombstones) throws EntryNotFoundException, IOException { RegionEntry re = null; - if (allowReadFromHDFS) { - re = this.entries.getEntry(key); - } else { - re = this.entries.getOperationalEntryInVM(key); - } + re = this.entries.getEntry(key); if (re == null) { return NULLVALUE; } @@ -1504,13 +1442,18 @@ implements Bucket * * @param keyInfo * @param generateCallbacks - * @param clientEvent holder for the entry's version information + * @param clientEvent holder for the entry's version information * @param returnTombstones TODO * @return serialized (byte) form * @throws IOException if the result is not serializable * @see LocalRegion#get(Object, Object, boolean, EntryEventImpl) */ - public RawValue getSerialized(KeyInfo keyInfo, boolean generateCallbacks, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws IOException { + public RawValue getSerialized(KeyInfo keyInfo, + boolean generateCallbacks, + boolean doNotLockEntry, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) throws IOException { checkReadiness(); checkForNoAccess(); CachePerfStats stats = getCachePerfStats(); @@ -1520,7 +1463,7 @@ implements Bucket try { RawValue valueBytes = NULLVALUE; boolean isCreate = false; - RawValue result = getSerialized(keyInfo.getKey(), true, doNotLockEntry, clientEvent, returnTombstones, allowReadFromHDFS); + RawValue result = getSerialized(keyInfo.getKey(), true, doNotLockEntry, clientEvent, returnTombstones); isCreate = result == NULLVALUE || (result.getRawValue() == Token.TOMBSTONE && !returnTombstones); miss = (result == NULLVALUE || Token.isInvalid(result.getRawValue())); if (miss) { @@ -1532,7 +1475,7 @@ implements Bucket return REQUIRES_ENTRY_LOCK; } Object value = nonTxnFindObject(keyInfo, isCreate, - generateCallbacks, result.getRawValue(), true, true, requestingClient, clientEvent, false, allowReadFromHDFS); + generateCallbacks, result.getRawValue(), true, true, requestingClient, clientEvent, false); if (value != null) { result = new RawValue(value); } @@ -2471,36 +2414,8 @@ implements Bucket } public void beforeAcquiringPrimaryState() { - try { - createHoplogOrganizer(); - } catch (IOException e) { - // 48990: when HDFS was down, gemfirexd should still start normally - logger.warn(LocalizedStrings.HOPLOG_NOT_STARTED_YET, e); - } catch(Throwable e) { - /*MergeGemXDHDFSToGFE changed this code to checkReadiness*/ - // SystemFailure.checkThrowable(e); - this.checkReadiness(); - //49333 - no matter what, we should elect a primary. - logger.error(LocalizedStrings.LocalRegion_UNEXPECTED_EXCEPTION, e); - } - } - - public HoplogOrganizer createHoplogOrganizer() throws IOException { - if (getPartitionedRegion().isHDFSRegion()) { - HoplogOrganizer organizer = hoplog.get(); - if (organizer != null) { - // hoplog is recreated by anther thread - return organizer; - } - - HoplogOrganizer hdfs = hoplog.getAndSet(getPartitionedRegion().hdfsManager.create(getId())); - assert hdfs == null; - return hoplog.get(); - } else { - return null; - } } - + public void afterAcquiringPrimaryState() { } @@ -2508,105 +2423,13 @@ implements Bucket * Invoked when a primary bucket is demoted. */ public void beforeReleasingPrimaryLockDuringDemotion() { - releaseHoplogOrganizer(); } - protected void releaseHoplogOrganizer() { - // release resources during a clean transition - HoplogOrganizer hdfs = hoplog.getAndSet(null); - if (hdfs != null) { - getPartitionedRegion().hdfsManager.close(getId()); - } - } - - public HoplogOrganizer getHoplogOrganizer() throws HDFSIOException { - HoplogOrganizer organizer = hoplog.get(); - if (organizer == null) { - synchronized (getBucketAdvisor()) { - checkForPrimary(); - try { - organizer = createHoplogOrganizer(); - } catch (IOException e) { - throw new HDFSIOException("Failed to create Hoplog organizer due to ", e); - } - if (organizer == null) { - throw new HDFSIOException("Hoplog organizer is not available for " + this); - } - } - } - return organizer; - } - @Override public RegionAttributes getAttributes() { return this; } - @Override - public void hdfsCalled(Object key) { - this.partitionedRegion.hdfsCalled(key); - } - - @Override - protected void clearHDFSData() { - //clear the HDFS data if present - if (getPartitionedRegion().isHDFSReadWriteRegion()) { - // Clear the queue - ConcurrentParallelGatewaySenderQueue q = getHDFSQueue(); - if (q == null) return; - q.clear(getPartitionedRegion(), this.getId()); - HoplogOrganizer organizer = hoplog.get(); - if (organizer != null) { - try { - organizer.clear(); - } catch (IOException e) { - throw new GemFireIOException(LocalizedStrings.HOPLOG_UNABLE_TO_DELETE_HDFS_DATA.toLocalizedString(), e); - } - } - } - } - - public EvictionCriteria getEvictionCriteria() { - return this.partitionedRegion.getEvictionCriteria(); - } - - public CustomEvictionAttributes getCustomEvictionAttributes() { - return this.partitionedRegion.getCustomEvictionAttributes(); - } - - /** - * @return true if the evict destroy was done; false if it was not needed - */ - public boolean customEvictDestroy(Object key) - { - checkReadiness(); - @Released final EntryEventImpl event = - generateCustomEvictDestroyEvent(key); - event.setCustomEviction(true); - boolean locked = false; - try { - locked = beginLocalWrite(event); - return mapDestroy(event, - false, // cacheWrite - true, // isEviction - null); // expectedOldValue - } - catch (CacheWriterException error) { - throw new Error(LocalizedStrings.LocalRegion_CACHE_WRITER_SHOULD_NOT_HAVE_BEEN_CALLED_FOR_EVICTDESTROY.toLocalizedString(), error); - } - catch (TimeoutException anotherError) { - throw new Error(LocalizedStrings.LocalRegion_NO_DISTRIBUTED_LOCK_SHOULD_HAVE_BEEN_ATTEMPTED_FOR_EVICTDESTROY.toLocalizedString(), anotherError); - } - catch (EntryNotFoundException yetAnotherError) { - throw new Error(LocalizedStrings.LocalRegion_ENTRYNOTFOUNDEXCEPTION_SHOULD_BE_MASKED_FOR_EVICTDESTROY.toLocalizedString(), yetAnotherError); - } finally { - if (locked) { - endLocalWrite(event); - } - event.release(); - } - } - public boolean areSecondariesPingable() { Set hostingservers = this.partitionedRegion.getRegionAdvisor() http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java index 0facd93..0243cde 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java @@ -441,7 +441,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { } } - protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event, int sizeOfHDFSEvent) { + protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event) { if (didPut) { if (this.initialized) { this.eventSeqNumQueue.add(key); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java index 6f673c7..4a34771 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java @@ -38,8 +38,6 @@ import com.gemstone.gemfire.cache.InterestPolicy; import com.gemstone.gemfire.cache.RegionDestroyedException; import com.gemstone.gemfire.cache.Scope; import com.gemstone.gemfire.cache.SubscriptionAttributes; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl; import com.gemstone.gemfire.distributed.Role; import com.gemstone.gemfire.distributed.internal.DistributionAdvisor; import com.gemstone.gemfire.distributed.internal.DistributionManager; @@ -1228,30 +1226,16 @@ public class CacheDistributionAdvisor extends DistributionAdvisor { public boolean include(final Profile profile) { if (profile instanceof CacheProfile) { final CacheProfile cp = (CacheProfile)profile; - /*Since HDFS queues are created only when a region is created, this check is - * unnecessary. Also this check is creating problem because hdfs queue is not - * created on an accessor. Hence removing this check for hdfs queues. */ - Set allAsyncEventIdsNoHDFS = removeHDFSQueues(allAsyncEventIds); - Set profileQueueIdsNoHDFS = removeHDFSQueues(cp.asyncEventQueueIds); - if (allAsyncEventIdsNoHDFS.equals(profileQueueIdsNoHDFS)) { + if (allAsyncEventIds.equals(cp.asyncEventQueueIds)) { return true; }else{ - differAsycnQueueIds.add(allAsyncEventIdsNoHDFS); - differAsycnQueueIds.add(profileQueueIdsNoHDFS); + differAsycnQueueIds.add(allAsyncEventIds); + differAsycnQueueIds.add(cp.asyncEventQueueIds); return false; } } return false; } - private Set removeHDFSQueues(Set queueIds){ - Set queueIdsWithoutHDFSQueues = new HashSet(); - for (String queueId: queueIds){ - if (!queueId.startsWith(HDFSStoreFactoryImpl.DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS)){ - queueIdsWithoutHDFSQueues.add(queueId); - } - } - return queueIdsWithoutHDFSQueues; - } }); return differAsycnQueueIds; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java index 382c537..ad84963 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java @@ -156,13 +156,6 @@ public class CachePerfStats { protected static final int compressionPreCompressedBytesId; protected static final int compressionPostCompressedBytesId; - protected static final int evictByCriteria_evictionsId;// total actual evictions (entries evicted) - protected static final int evictByCriteria_evictionTimeId;// total eviction time including product + user expr. - protected static final int evictByCriteria_evictionsInProgressId; - protected static final int evictByCriteria_evaluationsId;// total eviction attempts - protected static final int evictByCriteria_evaluationTimeId;// time taken to evaluate user expression. - - /** The Statistics object that we delegate most behavior to */ protected final Statistics stats; @@ -521,12 +514,6 @@ public class CachePerfStats { compressionDecompressionsId = type.nameToId("decompressions"); compressionPreCompressedBytesId = type.nameToId("preCompressedBytes"); compressionPostCompressedBytesId = type.nameToId("postCompressedBytes"); - - evictByCriteria_evictionsId = type.nameToId("evictByCriteria_evictions"); - evictByCriteria_evictionTimeId = type.nameToId("evictByCriteria_evictionTime"); - evictByCriteria_evictionsInProgressId = type.nameToId("evictByCriteria_evictionsInProgress"); - evictByCriteria_evaluationsId= type.nameToId("evictByCriteria_evaluations"); - evictByCriteria_evaluationTimeId = type.nameToId("evictByCriteria_evaluationTime"); } //////////////////////// Constructors //////////////////////// @@ -1354,66 +1341,4 @@ public class CachePerfStats { stats.incLong(exportTimeId, getStatTime() - start); } } - -// // used for the case of evict on incoming - public long startCustomEviction() { - return NanoTimer.getTime(); - } - - // used for the case of evict on incoming - public void endCustomEviction(long start) { - long ts = NanoTimer.getTime(); - stats.incLong(evictByCriteria_evictionTimeId, ts - start); - } - - public void incEvictionsInProgress() { - this.stats.incLong(evictByCriteria_evictionsInProgressId, 1); - } - - public void decEvictionsInProgress() { - this.stats.incLong(evictByCriteria_evictionsInProgressId, -1); - } - - public void incEvictions() { - this.stats.incLong(evictByCriteria_evictionsId, 1); - } - - public void incEvaluations() { - this.stats.incLong(evictByCriteria_evaluationsId, 1); - } - - public void incEvaluations(int delta) { - this.stats.incLong(evictByCriteria_evaluationsId, delta); - } - - public long startEvaluation() { - return NanoTimer.getTime(); - } - - public void endEvaluation(long start, long notEvaluationTime) { - long ts = NanoTimer.getTime(); - long totalTime = ts - start; - long evaluationTime = totalTime - notEvaluationTime; - stats.incLong(evictByCriteria_evaluationTimeId, evaluationTime); - } - - public long getEvictions() { - return stats.getLong(evictByCriteria_evictionsId); - } - - public long getEvictionsInProgress() { - return stats.getLong(evictByCriteria_evictionsInProgressId); - } - - public long getEvictionsTime() { - return stats.getLong(evictByCriteria_evictionTimeId); - } - - public long getEvaluations() { - return stats.getLong(evictByCriteria_evaluationsId); - } - - public long getEvaluationTime() { - return stats.getLong(evictByCriteria_evaluationTimeId); - } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java index 1441144..72edc10 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java @@ -107,9 +107,6 @@ public class ColocationHelper { } private static PartitionedRegion getColocatedPR( final PartitionedRegion partitionedRegion, final String colocatedWith) { - logger.info(LocalizedMessage.create( - LocalizedStrings.HOPLOG_0_COLOCATE_WITH_REGION_1_NOT_INITIALIZED_YET, - new Object[] { partitionedRegion.getFullPath(), colocatedWith })); PartitionedRegion colocatedPR = (PartitionedRegion) partitionedRegion .getCache().getPartitionedRegion(colocatedWith, false); assert colocatedPR != null; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CustomEvictionAttributesImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CustomEvictionAttributesImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CustomEvictionAttributesImpl.java deleted file mode 100644 index 0c82f97..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CustomEvictionAttributesImpl.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.gemstone.gemfire.internal.cache; - -import com.gemstone.gemfire.cache.CustomEvictionAttributes; -import com.gemstone.gemfire.cache.EvictionCriteria; - -/** - * Concrete instance of {@link CustomEvictionAttributes}. - * - * @since gfxd 1.0 - */ -public final class CustomEvictionAttributesImpl extends - CustomEvictionAttributes { - - public CustomEvictionAttributesImpl(EvictionCriteria criteria, - long startTime, long interval, boolean evictIncoming) { - super(criteria, startTime, interval, evictIncoming); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java index f8475ae..cafdb80 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java @@ -145,7 +145,7 @@ public class DistTXState extends TXState { } } } // end if primary - } // end non-hdfs buckets + } } } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java index a6d2488..6a7b4f2 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java @@ -863,8 +863,6 @@ public abstract class DistributedCacheOperation { private final static int INHIBIT_NOTIFICATIONS_MASK = 0x400; - protected final static short FETCH_FROM_HDFS = 0x200; - protected final static short IS_PUT_DML = 0x100; public boolean needsRouting; @@ -1367,7 +1365,6 @@ public abstract class DistributedCacheOperation { if ((extBits & INHIBIT_NOTIFICATIONS_MASK) != 0) { this.inhibitAllNotifications = true; if (this instanceof PutAllMessage) { - ((PutAllMessage) this).setFetchFromHDFS((extBits & FETCH_FROM_HDFS) != 0); ((PutAllMessage) this).setPutDML((extBits & IS_PUT_DML) != 0); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java index 2817fdd..b6aa1b6 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java @@ -856,7 +856,6 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation PutAllMessage msg = new PutAllMessage(); msg.eventId = event.getEventId(); msg.context = event.getContext(); - msg.setFetchFromHDFS(event.isFetchFromHDFS()); msg.setPutDML(event.isPutDML()); return msg; } @@ -871,7 +870,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation public PutAllPRMessage createPRMessagesNotifyOnly(int bucketId) { final EntryEventImpl event = getBaseEvent(); PutAllPRMessage prMsg = new PutAllPRMessage(bucketId, putAllDataSize, true, - event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), false, false /*isPutDML*/); + event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), false /*isPutDML*/); if (event.getContext() != null) { prMsg.setBridgeContext(event.getContext()); } @@ -900,7 +899,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation PutAllPRMessage prMsg = (PutAllPRMessage)prMsgMap.get(bucketId); if (prMsg == null) { prMsg = new PutAllPRMessage(bucketId.intValue(), putAllDataSize, false, - event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), event.isFetchFromHDFS(), event.isPutDML()); + event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), event.isPutDML()); prMsg.setTransactionDistributed(event.getRegion().getCache().getTxManager().isDistributed()); // set dpao's context(original sender) into each PutAllMsg @@ -1077,9 +1076,6 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation protected EventID eventId = null; - // By default, fetchFromHDFS == true; - private transient boolean fetchFromHDFS = true; - private transient boolean isPutDML = false; protected static final short HAS_BRIDGE_CONTEXT = UNRESERVED_FLAGS_START; @@ -1137,12 +1133,11 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation * the region the entry is put in */ public void doEntryPut(PutAllEntryData entry, DistributedRegion rgn, - boolean requiresRegionContext, boolean fetchFromHDFS, boolean isPutDML) { + boolean requiresRegionContext, boolean isPutDML) { @Released EntryEventImpl ev = PutAllMessage.createEntryEvent(entry, getSender(), this.context, rgn, requiresRegionContext, this.possibleDuplicate, this.needsRouting, this.callbackArg, true, skipCallbacks); - ev.setFetchFromHDFS(fetchFromHDFS); ev.setPutDML(isPutDML); // we don't need to set old value here, because the msg is from remote. local old value will get from next step try { @@ -1237,7 +1232,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation logger.debug("putAll processing {} with {} sender={}", putAllData[i], putAllData[i].versionTag, sender); } putAllData[i].setSender(sender); - doEntryPut(putAllData[i], rgn, requiresRegionContext, fetchFromHDFS, isPutDML); + doEntryPut(putAllData[i], rgn, requiresRegionContext, isPutDML); } } }, ev.getEventId()); @@ -1366,10 +1361,6 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation return Arrays.asList(ops); } - public void setFetchFromHDFS(boolean val) { - this.fetchFromHDFS = val; - } - public void setPutDML(boolean val) { this.isPutDML = val; } @@ -1377,9 +1368,6 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation @Override protected short computeCompressedExtBits(short bits) { bits = super.computeCompressedExtBits(bits); - if (fetchFromHDFS) { - bits |= FETCH_FROM_HDFS; - } if (isPutDML) { bits |= IS_PUT_DML; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java index addba8e..226d914 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java @@ -17,8 +17,6 @@ package com.gemstone.gemfire.internal.cache; -import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE; - import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -113,8 +111,6 @@ import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationE import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector; import com.gemstone.gemfire.internal.cache.versions.VersionSource; import com.gemstone.gemfire.internal.cache.versions.VersionTag; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor; import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueConfigurationException; import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException; import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; @@ -1264,8 +1260,6 @@ public class DistributedRegion extends LocalRegion implements private final Set memoryThresholdReachedMembers = new CopyOnWriteArraySet(); - private ConcurrentParallelGatewaySenderQueue hdfsQueue; - /** Sets and returns giiMissingRequiredRoles */ private boolean checkInitialImageForReliability( InternalDistributedMember imageTarget, @@ -2424,9 +2418,16 @@ public class DistributedRegion extends LocalRegion implements /** @return the deserialized value */ @Override @Retained - protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate, - TXStateInterface txState, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, - boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) + protected Object findObjectInSystem(KeyInfo keyInfo, + boolean isCreate, + TXStateInterface txState, + boolean generateCallbacks, + Object localValue, + boolean disableCopyOnRead, + boolean preferCD, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) throws CacheLoaderException, TimeoutException { checkForLimitedOrNoAccess(); @@ -2545,18 +2546,6 @@ public class DistributedRegion extends LocalRegion implements } } - protected ConcurrentParallelGatewaySenderQueue getHDFSQueue() { - if (this.hdfsQueue == null) { - String asyncQId = this.getPartitionedRegion().getHDFSEventQueueName(); - final AsyncEventQueueImpl asyncQ = (AsyncEventQueueImpl)this.getCache().getAsyncEventQueue(asyncQId); - final AbstractGatewaySender gatewaySender = (AbstractGatewaySender)asyncQ.getSender(); - AbstractGatewaySenderEventProcessor ep = gatewaySender.getEventProcessor(); - if (ep == null) return null; - hdfsQueue = (ConcurrentParallelGatewaySenderQueue)ep.getQueue(); - } - return hdfsQueue; - } - /** hook for subclasses to note that a cache load was performed * @see BucketRegion#performedLoad */ http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java index 2b826ce..e241622 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java @@ -193,16 +193,8 @@ public class EntryEventImpl /** version tag for concurrency checks */ protected VersionTag versionTag; - /** boolean to indicate that this operation should be optimized by not fetching from HDFS*/ - private transient boolean fetchFromHDFS = true; - private transient boolean isPutDML = false; - /** boolean to indicate that the RegionEntry for this event was loaded from HDFS*/ - private transient boolean loadedFromHDFS= false; - - private transient boolean isCustomEviction = false; - /** boolean to indicate that the RegionEntry for this event has been evicted*/ private transient boolean isEvicted = false; @@ -658,14 +650,6 @@ public class EntryEventImpl return this.op.isEviction(); } - public final boolean isCustomEviction() { - return this.isCustomEviction; - } - - public final void setCustomEviction(boolean customEvict) { - this.isCustomEviction = customEvict; - } - public final void setEvicted() { this.isEvicted = true; } @@ -3047,13 +3031,6 @@ public class EntryEventImpl public boolean isOldValueOffHeap() { return isOffHeapReference(this.oldValue); } - public final boolean isFetchFromHDFS() { - return fetchFromHDFS; - } - - public final void setFetchFromHDFS(boolean fetchFromHDFS) { - this.fetchFromHDFS = fetchFromHDFS; - } public final boolean isPutDML() { return this.isPutDML; @@ -3062,12 +3039,4 @@ public class EntryEventImpl public final void setPutDML(boolean val) { this.isPutDML = val; } - - public final boolean isLoadedFromHDFS() { - return loadedFromHDFS; - } - - public final void setLoadedFromHDFS(boolean loadedFromHDFS) { - this.loadedFromHDFS = loadedFromHDFS; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java deleted file mode 100644 index 9054d6d..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache; - -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import com.gemstone.gemfire.cache.CacheClosedException; -import com.gemstone.gemfire.cache.EvictionCriteria; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.google.common.util.concurrent.AbstractScheduledService; -import com.gemstone.gemfire.internal.offheap.Releasable; -/** - * Schedules each iteration periodically. EvictorService takes absolute time and - * a period as input and schedules Eviction at absolute times by calculating the - * interval. For scheduling the next eviction iteration it also takes into - * account the time taken to complete one iteration. If an iteration takes more - * time than the specified period then another iteration is scheduled - * immediately. - * - * - */ - -public class EvictorService extends AbstractScheduledService { - - private final EvictionCriteria criteria; - - // period is always in seconds - private long interval; - - private volatile boolean stopScheduling; - - private long nextScheduleTime; - - private GemFireCacheImpl cache; - - private Region region; - - private volatile ScheduledExecutorService executorService; - - public EvictorService(EvictionCriteria criteria, - long evictorStartTime, long evictorInterval, TimeUnit unit, Region r) { - this.criteria = criteria; - this.interval = unit.toSeconds(evictorInterval); - this.region = r; - try { - this.cache = GemFireCacheImpl.getExisting(); - } catch (CacheClosedException cce) { - - } - //TODO: Unless we revisit System.currentTimeMillis or cacheTimeMillis keep the default -// long now = (evictorStartTime != 0 ? evictorStartTime -// + this.cache.getDistributionManager().getCacheTimeOffset() : this.cache -// .getDistributionManager().cacheTimeMillis()) / 1000; - long now = this.cache.getDistributionManager().cacheTimeMillis() / 1000; - if (this.cache.getLoggerI18n().fineEnabled()) { - this.cache.getLoggerI18n().fine( - "EvictorService: The startTime(now) is " + now + " evictorStartTime : " + evictorStartTime); - } - - this.nextScheduleTime = now + 10; - - if (this.cache.getLoggerI18n().fineEnabled()) { - this.cache.getLoggerI18n().fine( - "EvictorService: The startTime is " + this.nextScheduleTime); - } - } - - @Override - protected void runOneIteration() throws Exception { - if (this.cache.getLoggerI18n().fineEnabled()) { - this.cache.getLoggerI18n() - .fine( - "EvictorService: Running the iteration at " - + cache.cacheTimeMillis()); - } - if (stopScheduling || checkCancelled(cache)) { - stopScheduling(); // if check cancelled - if (this.cache.getLoggerI18n().fineEnabled()) { - this.cache - .getLoggerI18n() - .fine( - "EvictorService: Abort eviction since stopScheduling OR cancel in progress. Evicted 0 entries "); - } - return; - } - CachePerfStats stats = ((LocalRegion)this.region).getCachePerfStats(); - long startEvictionTime = stats.startCustomEviction(); - int evicted = 0; - long startEvaluationTime = stats.startEvaluation(); - Iterator> keysItr = null; - long totalIterationsTime = 0; - - keysItr = this.criteria.getKeysToBeEvicted(this.cache - .getDistributionManager().cacheTimeMillis(), this.region); - try { - stats.incEvaluations(this.region.size()); - // if we have been asked to stop scheduling - // or the cache is closing stop in between. - - - while (keysItr.hasNext() && !stopScheduling && !checkCancelled(cache)) { - Map.Entry entry = keysItr.next(); - long startIterationTime = this.cache - .getDistributionManager().cacheTimeMillis(); - Object routingObj = entry.getValue(); - if (this.cache.getLoggerI18n().fineEnabled()) { - this.cache.getLoggerI18n().fine( - "EvictorService: Going to evict the following entry " + entry); - } - if (this.region instanceof PartitionedRegion) { - try { - PartitionedRegion pr = (PartitionedRegion)this.region; - stats.incEvictionsInProgress(); - int bucketId = PartitionedRegionHelper.getHashKey(pr, routingObj); - BucketRegion br = pr.getDataStore().getLocalBucketById(bucketId); - // This has to be called on BucketRegion directly and not on the PR as - // PR doesn't allow operation on Secondary buckets. - if (br != null) { - if (this.cache.getLoggerI18n().fineEnabled()) { - this.cache.getLoggerI18n().fine( - "EvictorService: Going to evict the following entry " + entry - + " from bucket " + br); - } - if (br.getBucketAdvisor().isPrimary()) { - boolean succ = false; - try { - succ = br.customEvictDestroy(entry.getKey()); - } catch (PrimaryBucketException e) { - if (this.cache.getLoggerI18n().fineEnabled()) { - this.cache.getLoggerI18n().warning( - LocalizedStrings.EVICTORSERVICE_CAUGHT_EXCEPTION_0, e); - } - } - - if (succ) - evicted++; - if (this.cache.getLoggerI18n().fineEnabled()) { - this.cache.getLoggerI18n() - .fine( - "EvictorService: Evicted the following entry " + entry - + " from bucket " + br + " successfully " + succ - + " the value in buk is " /* - * + - * br.get(entry.getKey()) - */); - } - } - } - stats.incEvictions(); - } catch (Exception e) { - if (this.cache.getLoggerI18n().fineEnabled()) { - this.cache.getLoggerI18n().warning( - LocalizedStrings.EVICTORSERVICE_CAUGHT_EXCEPTION_0, e); - } - // TODO: - // Do the exception handling . - // Check if the bucket is present - // If the entry could not be evicted then log the warning - // Log any other exception. - }finally{ - stats.decEvictionsInProgress(); - long endIterationTime = this.cache - .getDistributionManager().cacheTimeMillis(); - totalIterationsTime += (endIterationTime - startIterationTime); - } - } - } - }finally { - if(keysItr instanceof Releasable) { - ((Releasable)keysItr).release(); - } - } - stats.endEvaluation(startEvaluationTime, totalIterationsTime); - - if (this.cache.getLoggerI18n().fineEnabled()) { - this.cache.getLoggerI18n().fine( - "EvictorService: Completed an iteration at time " - + this.cache.getDistributionManager().cacheTimeMillis() / 1000 - + ". Evicted " + evicted + " entries."); - } - stats.endCustomEviction(startEvictionTime); - } - - private boolean checkCancelled(GemFireCacheImpl cache) { - if (cache.getCancelCriterion().cancelInProgress() != null) { - return true; - } - return false; - } - - @Override - protected Scheduler scheduler() { - return new CustomScheduler() { - @Override - protected Schedule getNextSchedule() throws Exception { - // get the current time in seconds from DM. - // it takes care of clock skew etc in different VMs - long now = cache.getDistributionManager().cacheTimeMillis() / 1000; - if (cache.getLoggerI18n().fineEnabled()) { - cache.getLoggerI18n().fine("EvictorService: Now is " + now); - } - long delay = 0; - if (now < nextScheduleTime) { - delay = nextScheduleTime - now; - } - nextScheduleTime += interval; - // calculate the next immediate time i.e. schedule time in seconds - // set the schedule.delay to that scheduletime - currenttime - if (cache.getLoggerI18n().fineEnabled()) { - cache.getLoggerI18n().fine( - "EvictorService: Returning the next schedule with delay " + delay - + " next schedule is at : " + nextScheduleTime); - } - - return new Schedule(delay, TimeUnit.SECONDS); - } - }; - } - - /** - * Region.destroy and Region.close should make sure to call this method. This - * will be called here. - */ - public void stopScheduling() { - this.stopScheduling = true; - } - - // this will be called when we stop the service. - // not sure if we have to do any cleanup - // to stop the service call stop() - @Override - protected void shutDown() throws Exception { - this.executorService.shutdownNow(); - this.region= null; - this.cache = null; - } - - // This will be called when we start the service. - // not sure if we have to any intialization - @Override - protected void startUp() throws Exception { - - } - - public void changeEvictionInterval(long newInterval) { - this.interval = newInterval / 1000; - if (cache.getLoggerI18n().fineEnabled()) { - cache.getLoggerI18n().fine( - "EvictorService: New interval is " + this.interval); - } - } - - public void changeStartTime(long newStart) { - this.nextScheduleTime = newStart/1000; - if (cache.getLoggerI18n().fineEnabled()) { - cache.getLoggerI18n().fine("EvictorService: New start time is " + this.nextScheduleTime); - } - } - - protected ScheduledExecutorService executor() { - this.executorService = super.executor(); - return this.executorService; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java index cc9727b..c477466 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java @@ -126,16 +126,6 @@ import com.gemstone.gemfire.cache.client.internal.ClientMetadataService; import com.gemstone.gemfire.cache.client.internal.ClientRegionFactoryImpl; import com.gemstone.gemfire.cache.client.internal.PoolImpl; import com.gemstone.gemfire.cache.execute.FunctionService; -import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSIntegrationUtil; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreCreation; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSFlushQueueFunction; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionFunction; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSLastCompactionTimeFunction; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector; import com.gemstone.gemfire.cache.query.QueryService; import com.gemstone.gemfire.cache.query.internal.DefaultQuery; import com.gemstone.gemfire.cache.query.internal.DefaultQueryService; @@ -932,9 +922,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer HARegionQueue.setMessageSyncInterval(HARegionQueue.DEFAULT_MESSAGE_SYNC_INTERVAL); } FunctionService.registerFunction(new PRContainsValueFunction()); - FunctionService.registerFunction(new HDFSLastCompactionTimeFunction()); - FunctionService.registerFunction(new HDFSForceCompactionFunction()); - FunctionService.registerFunction(new HDFSFlushQueueFunction()); this.expirationScheduler = new ExpirationScheduler(this.system); // uncomment following line when debugging CacheExistsException @@ -2185,8 +2172,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer closeDiskStores(); diskMonitor.close(); - closeHDFSStores(); - // Close the CqService Handle. try { if (isDebugEnabled) { @@ -2272,7 +2257,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer } catch (CancelException e) { // make sure the disk stores get closed closeDiskStores(); - closeHDFSStores(); // NO DISTRIBUTED MESSAGING CAN BE DONE HERE! // okay, we're taking too long to do this stuff, so let's @@ -3119,8 +3103,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer future = (Future) this.reinitializingRegions.get(fullPath); } if (future == null) { - HDFSIntegrationUtil.createAndAddAsyncQueue(regionPath, attrs, this); - attrs = setEvictionAttributesForLargeRegion(attrs); if (internalRegionArgs.getInternalMetaRegion() != null) { rgn = internalRegionArgs.getInternalMetaRegion(); } else if (isPartitionedRegion) { @@ -3245,54 +3227,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer } } - /** - * turn on eviction by default for HDFS regions - */ - @SuppressWarnings("deprecation") - public RegionAttributes setEvictionAttributesForLargeRegion( - RegionAttributes attrs) { - RegionAttributes ra = attrs; - if (DISABLE_AUTO_EVICTION) { - return ra; - } - if (attrs.getDataPolicy().withHDFS() - || attrs.getHDFSStoreName() != null) { - // make the region overflow by default - EvictionAttributes evictionAttributes = attrs.getEvictionAttributes(); - boolean hasNoEvictionAttrs = evictionAttributes == null - || evictionAttributes.getAlgorithm().isNone(); - AttributesFactory af = new AttributesFactory(attrs); - String diskStoreName = attrs.getDiskStoreName(); - // set the local persistent directory to be the same as that for - // HDFS store - if (attrs.getHDFSStoreName() != null) { - HDFSStoreImpl hdfsStore = findHDFSStore(attrs.getHDFSStoreName()); - if (attrs.getPartitionAttributes().getLocalMaxMemory() != 0 && hdfsStore == null) { - // HDFS store expected to be found at this point - throw new IllegalStateException( - LocalizedStrings.HOPLOG_HDFS_STORE_NOT_FOUND - .toLocalizedString(attrs.getHDFSStoreName())); - } - // if there is no disk store, use the one configured for hdfs queue - if (attrs.getPartitionAttributes().getLocalMaxMemory() != 0 && diskStoreName == null) { - diskStoreName = hdfsStore.getDiskStoreName(); - } - } - // set LRU heap eviction with overflow to disk for HDFS stores with - // local Oplog persistence - // set eviction attributes only if not set - if (hasNoEvictionAttrs) { - if (diskStoreName != null) { - af.setDiskStoreName(diskStoreName); - } - af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes( - ObjectSizer.DEFAULT, EvictionAction.OVERFLOW_TO_DISK)); - } - ra = af.create(); - } - return ra; - } - public final Region getRegion(String path) { return getRegion(path, false); } @@ -5403,39 +5337,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer } } - public HDFSStoreFactory createHDFSStoreFactory(HDFSStoreCreation creation) { - return new HDFSStoreFactoryImpl(this, creation); - } - public void addHDFSStore(HDFSStoreImpl hsi) { - HDFSStoreDirector.getInstance().addHDFSStore(hsi); - //TODO:HDFS Add a resource event for hdfs store creation as well - // like the following disk store event - //system.handleResourceEvent(ResourceEvent.DISKSTORE_CREATE, dsi); - } - - public void removeHDFSStore(HDFSStoreImpl hsi) { - //hsi.destroy(); - HDFSStoreDirector.getInstance().removeHDFSStore(hsi.getName()); - //TODO:HDFS Add a resource event for hdfs store as well - // like the following disk store event - //system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi); - } - - public void closeHDFSStores() { - HDFSRegionDirector.reset(); - HDFSStoreDirector.getInstance().closeHDFSStores(); - } - - - public HDFSStoreImpl findHDFSStore(String name) { - return HDFSStoreDirector.getInstance().getHDFSStore(name); - } - - public Collection getHDFSStores() { - return HDFSStoreDirector.getInstance().getAllHDFSStores(); - } - - public TemporaryResultSetFactory getResultSetFactory() { return this.resultSetFactory; }