Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E8DD718FB6 for ; Thu, 28 Apr 2016 23:10:05 +0000 (UTC) Received: (qmail 13093 invoked by uid 500); 28 Apr 2016 23:10:05 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 13061 invoked by uid 500); 28 Apr 2016 23:10:05 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 13052 invoked by uid 99); 28 Apr 2016 23:10:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Apr 2016 23:10:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 3A9F91806C4 for ; Thu, 28 Apr 2016 23:10:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id aIvgf6BPkUhk for ; Thu, 28 Apr 2016 23:09:46 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 7BE875FCE8 for ; Thu, 28 Apr 2016 23:09:42 +0000 (UTC) Received: (qmail 9087 invoked by uid 99); 28 Apr 2016 23:09:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Apr 2016 23:09:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C0DE8E00C7; Thu, 28 Apr 2016 23:09:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: klund@apache.org To: commits@geode.incubator.apache.org Date: Thu, 28 Apr 2016 23:10:03 -0000 Message-Id: In-Reply-To: <82e5948c82d540f8bec515c234fa1706@git.apache.org> References: <82e5948c82d540f8bec515c234fa1706@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [24/50] [abbrv] incubator-geode git commit: GEODE-1072: Removing HDFS related code http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java index c75286e..328c196 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java @@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; @@ -93,22 +92,12 @@ import com.gemstone.gemfire.cache.TransactionDataNotColocatedException; import com.gemstone.gemfire.cache.TransactionDataRebalancedException; import com.gemstone.gemfire.cache.TransactionException; import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; -import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats; import com.gemstone.gemfire.cache.execute.EmtpyRegionFunctionException; import com.gemstone.gemfire.cache.execute.Function; import com.gemstone.gemfire.cache.execute.FunctionContext; import com.gemstone.gemfire.cache.execute.FunctionException; import com.gemstone.gemfire.cache.execute.FunctionService; import com.gemstone.gemfire.cache.execute.ResultCollector; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.CompactionStatus; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSFlushQueueFunction; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionArgs; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionFunction; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionResultCollector; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSLastCompactionTimeFunction; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer; import com.gemstone.gemfire.cache.partition.PartitionListener; import com.gemstone.gemfire.cache.partition.PartitionNotAvailableException; import com.gemstone.gemfire.cache.query.FunctionDomainException; @@ -224,7 +213,6 @@ import com.gemstone.gemfire.internal.cache.partitioned.PutAllPRMessage; import com.gemstone.gemfire.internal.cache.partitioned.PutMessage; import com.gemstone.gemfire.internal.cache.partitioned.PutMessage.PutResult; import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor; -import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.BucketVisitor; import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.PartitionProfile; import com.gemstone.gemfire.internal.cache.partitioned.RemoveAllPRMessage; import com.gemstone.gemfire.internal.cache.partitioned.RemoveIndexesMessage; @@ -256,7 +244,6 @@ import com.gemstone.gemfire.internal.offheap.annotations.Released; import com.gemstone.gemfire.internal.offheap.annotations.Unretained; import com.gemstone.gemfire.internal.sequencelog.RegionLogger; import com.gemstone.gemfire.internal.util.TransformUtils; -import com.gemstone.gemfire.internal.util.concurrent.FutureResult; import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch; import com.gemstone.gemfire.i18n.StringId; @@ -708,17 +695,9 @@ public class PartitionedRegion extends LocalRegion implements private final PartitionListener[] partitionListeners; private boolean isShadowPR = false; - private boolean isShadowPRForHDFS = false; - + private AbstractGatewaySender parallelGatewaySender = null; - private final ThreadLocal queryHDFS = new ThreadLocal() { - @Override - protected Boolean initialValue() { - return false; - } - }; - public PartitionedRegion(String regionname, RegionAttributes ra, LocalRegion parentRegion, GemFireCacheImpl cache, InternalRegionArguments internalRegionArgs) { @@ -738,12 +717,6 @@ public class PartitionedRegion extends LocalRegion implements // (which prevents pridmap cleanup). cache.getDistributedSystem().addDisconnectListener(dsPRIdCleanUpListener); - // add an async queue for the region if the store name is not null. - if (this.getHDFSStoreName() != null) { - String eventQueueName = getHDFSEventQueueName(); - super.addAsyncEventQueueId(eventQueueName); - } - // this.userScope = ra.getScope(); this.partitionAttributes = ra.getPartitionAttributes(); this.localMaxMemory = this.partitionAttributes.getLocalMaxMemory(); @@ -822,8 +795,6 @@ public class PartitionedRegion extends LocalRegion implements if (internalRegionArgs.isUsedForParallelGatewaySenderQueue()) { this.isShadowPR = true; this.parallelGatewaySender = internalRegionArgs.getParallelGatewaySender(); - if (internalRegionArgs.isUsedForHDFSParallelGatewaySenderQueue()) - this.isShadowPRForHDFS = true; } @@ -867,38 +838,10 @@ public class PartitionedRegion extends LocalRegion implements }); } - @Override - public final boolean isHDFSRegion() { - return this.getHDFSStoreName() != null; - } - - @Override - public final boolean isHDFSReadWriteRegion() { - return isHDFSRegion() && !getHDFSWriteOnly(); - } - - @Override - protected final boolean isHDFSWriteOnly() { - return isHDFSRegion() && getHDFSWriteOnly(); - } - - public final void setQueryHDFS(boolean includeHDFS) { - queryHDFS.set(includeHDFS); - } - - @Override - public final boolean includeHDFSResults() { - return queryHDFS.get(); - } - public final boolean isShadowPR() { return isShadowPR; } - public final boolean isShadowPRForHDFS() { - return isShadowPRForHDFS; - } - public AbstractGatewaySender getParallelGatewaySender() { return parallelGatewaySender; } @@ -1664,7 +1607,7 @@ public class PartitionedRegion extends LocalRegion implements try { final boolean loc = (this.localMaxMemory > 0) && retryNode.equals(getMyId()); if (loc) { - ret = this.dataStore.getEntryLocally(bucketId, key, access, allowTombstones, true); + ret = this.dataStore.getEntryLocally(bucketId, key, access, allowTombstones); } else { ret = getEntryRemotely(retryNode, bucketIdInt, key, access, allowTombstones); // TODO:Suranjan&Yogesh : there should be better way than this one @@ -2123,8 +2066,7 @@ public class PartitionedRegion extends LocalRegion implements bucketStorageAssigned=false; // if this is a Delta update, then throw exception since the key doesn't // exist if there is no bucket for it yet - // For HDFS region, we will recover key, so allow bucket creation - if (!this.dataPolicy.withHDFS() && event.hasDelta()) { + if (event.hasDelta()) { throw new EntryNotFoundException(LocalizedStrings. PartitionedRegion_CANNOT_APPLY_A_DELTA_WITHOUT_EXISTING_ENTRY .toLocalizedString()); @@ -3319,9 +3261,9 @@ public class PartitionedRegion extends LocalRegion implements */ @Override public Object get(Object key, Object aCallbackArgument, - boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD, - ClientProxyMembershipID requestingClient, - EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws TimeoutException, CacheLoaderException + boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, boolean returnTombstones) throws TimeoutException, CacheLoaderException { validateKey(key); validateCallbackArg(aCallbackArgument); @@ -3335,7 +3277,7 @@ public class PartitionedRegion extends LocalRegion implements // if scope is local and there is no loader, then // don't go further to try and get value Object value = getDataView().findObject(getKeyInfo(key, aCallbackArgument), this, true/*isCreate*/, generateCallbacks, - null /*no local value*/, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS); + null /*no local value*/, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones); if (value != null && !Token.isInvalid(value)) { miss = false; } @@ -3381,7 +3323,7 @@ public class PartitionedRegion extends LocalRegion implements if (primary == null) { return null; } - if (isTX() || this.hdfsStoreName != null) { + if (isTX()) { return getNodeForBucketWrite(bucketId, null); } InternalDistributedMember result = getRegionAdvisor().getPreferredNode(bucketId); @@ -3395,7 +3337,7 @@ public class PartitionedRegion extends LocalRegion implements */ private InternalDistributedMember getNodeForBucketReadOrLoad(int bucketId) { InternalDistributedMember targetNode; - if (!this.haveCacheLoader && (this.hdfsStoreName == null)) { + if (!this.haveCacheLoader) { targetNode = getNodeForBucketRead(bucketId); } else { @@ -3528,9 +3470,16 @@ public class PartitionedRegion extends LocalRegion implements } @Override - protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate, - TXStateInterface tx, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, - EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) + protected Object findObjectInSystem(KeyInfo keyInfo, + boolean isCreate, + TXStateInterface tx, + boolean generateCallbacks, + Object localValue, + boolean disableCopyOnRead, + boolean preferCD, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) throws CacheLoaderException, TimeoutException { Object obj = null; @@ -3566,7 +3515,7 @@ public class PartitionedRegion extends LocalRegion implements return null; } - obj = getFromBucket(targetNode, bucketId, key, aCallbackArgument, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowRetry, allowReadFromHDFS); + obj = getFromBucket(targetNode, bucketId, key, aCallbackArgument, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowRetry); } finally { this.prStats.endGet(startTime); @@ -4149,15 +4098,22 @@ public class PartitionedRegion extends LocalRegion implements /** * no docs - * @param preferCD + * @param preferCD * @param requestingClient the client requesting the object, or null if not from a client * @param clientEvent TODO * @param returnTombstones TODO * @param allowRetry if false then do not retry */ private Object getFromBucket(final InternalDistributedMember targetNode, - int bucketId, final Object key, final Object aCallbackArgument, - boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowRetry, boolean allowReadFromHDFS) { + int bucketId, + final Object key, + final Object aCallbackArgument, + boolean disableCopyOnRead, + boolean preferCD, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones, + boolean allowRetry) { final boolean isDebugEnabled = logger.isDebugEnabled(); final int retryAttempts = calcRetry(); @@ -4187,7 +4143,7 @@ public class PartitionedRegion extends LocalRegion implements try { if (isLocal) { obj = this.dataStore.getLocally(bucketId, key, aCallbackArgument, - disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, false, allowReadFromHDFS); + disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, false); } else { if (localCacheEnabled && null != (obj = localCacheGet(key))) { // OFFHEAP: copy into heap cd; TODO optimize for preferCD case @@ -4196,14 +4152,14 @@ public class PartitionedRegion extends LocalRegion implements } return obj; } - else if (this.haveCacheLoader || this.hdfsStoreName != null) { + else if (this.haveCacheLoader) { // If the region has a cache loader, // the target node is the primary server of the bucket. But, if the // value can be found in a local bucket, we should first try there. /* MergeGemXDHDFSToGFE -readoing from local bucket was disabled in GemXD*/ if (null != ( obj = getFromLocalBucket(bucketId, key, aCallbackArgument, - disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS))) { + disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones))) { return obj; } } @@ -4211,7 +4167,7 @@ public class PartitionedRegion extends LocalRegion implements // Test hook if (((LocalRegion)this).isTest()) ((LocalRegion)this).incCountNotFoundInLocal(); - obj = getRemotely(retryNode, bucketId, key, aCallbackArgument, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS); + obj = getRemotely(retryNode, bucketId, key, aCallbackArgument, preferCD, requestingClient, clientEvent, returnTombstones); // TODO:Suranjan&Yogesh : there should be better way than this one String name = Thread.currentThread().getName(); @@ -4309,9 +4265,9 @@ public class PartitionedRegion extends LocalRegion implements * */ public Object getFromLocalBucket(int bucketId, final Object key, - final Object aCallbackArgument, boolean disableCopyOnRead, - boolean preferCD, ClientProxyMembershipID requestingClient, - EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) + final Object aCallbackArgument, boolean disableCopyOnRead, + boolean preferCD, ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, boolean returnTombstones) throws ForceReattemptException, PRLocallyDestroyedException { Object obj; // try reading locally. @@ -4320,7 +4276,7 @@ public class PartitionedRegion extends LocalRegion implements return null; // fixes 51657 } if (readNode.equals(getMyId()) && null != ( obj = this.dataStore.getLocally(bucketId, key, aCallbackArgument, - disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, true, allowReadFromHDFS))) { + disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, true))) { if (logger.isTraceEnabled()) { logger.trace("getFromBucket: Getting key {} ({}) locally - success", key, key.hashCode()); } @@ -5116,7 +5072,13 @@ public class PartitionedRegion extends LocalRegion implements * if the peer is no longer available */ public Object getRemotely(InternalDistributedMember targetNode, - int bucketId, final Object key, final Object aCallbackArgument, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws PrimaryBucketException, + int bucketId, + final Object key, + final Object aCallbackArgument, + boolean preferCD, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) throws PrimaryBucketException, ForceReattemptException { Object value; if (logger.isDebugEnabled()) { @@ -5124,7 +5086,7 @@ public class PartitionedRegion extends LocalRegion implements getPRId(), BUCKET_ID_SEPARATOR, bucketId, key); } GetResponse response = GetMessage.send(targetNode, this, key, - aCallbackArgument, requestingClient, returnTombstones, allowReadFromHDFS); + aCallbackArgument, requestingClient, returnTombstones); this.prStats.incPartitionMessagesSent(); value = response.waitForResponse(preferCD); if (clientEvent != null) { @@ -7078,9 +7040,6 @@ public class PartitionedRegion extends LocalRegion implements public int entryCount(Set buckets, boolean estimate) { Map bucketSizes = null; - if (isHDFSReadWriteRegion() && (includeHDFSResults() || estimate)) { - bucketSizes = getSizeForHDFS( buckets, estimate); - } else { if (buckets != null) { if (this.dataStore != null) { List list = new ArrayList(); @@ -7112,7 +7071,6 @@ public class PartitionedRegion extends LocalRegion implements } } } - } int size = 0; if (bucketSizes != null) { @@ -7135,81 +7093,7 @@ public class PartitionedRegion extends LocalRegion implements return 0; } } - private Map getSizeForHDFS(final Set buckets, boolean estimate) { - // figure out which buckets to include - Map bucketSizes = new HashMap(); - getRegionAdvisor().accept(new BucketVisitor>() { - @Override - public boolean visit(RegionAdvisor advisor, ProxyBucketRegion pbr, - Map map) { - if (buckets == null || buckets.contains(pbr.getBucketId())) { - map.put(pbr.getBucketId(), null); - // ensure that the bucket has been created - pbr.getPartitionedRegion().getOrCreateNodeForBucketWrite(pbr.getBucketId(), null); - } - return true; - } - }, bucketSizes); - RetryTimeKeeper retry = new RetryTimeKeeper(retryTimeout); - - while (true) { - // get the size from local buckets - if (dataStore != null) { - Map localSizes; - if (estimate) { - localSizes = dataStore.getSizeEstimateForLocalPrimaryBuckets(); - } else { - localSizes = dataStore.getSizeForLocalPrimaryBuckets(); - } - for (Map.Entry me : localSizes.entrySet()) { - if (bucketSizes.containsKey(me.getKey())) { - bucketSizes.put(me.getKey(), me.getValue()); - } - } - } - // all done - int count = 0; - Iterator it = bucketSizes.values().iterator(); - while (it.hasNext()) { - if (it.next() != null) count++; - } - if (bucketSizes.size() == count) { - return bucketSizes; - } - - Set remotes = getRegionAdvisor().adviseDataStore(true); - remotes.remove(getMyId()); - - // collect remote sizes - if (!remotes.isEmpty()) { - Map remoteSizes = new HashMap(); - try { - remoteSizes = getSizeRemotely(remotes, estimate); - } catch (ReplyException e) { - // Remote member will never throw ForceReattemptException or - // PrimaryBucketException, so any exception on the remote member - // should be re-thrown - e.handleAsUnexpected(); - } - for (Map.Entry me : remoteSizes.entrySet()) { - Integer k = me.getKey(); - if (bucketSizes.containsKey(k) && me.getValue().isPrimary()) { - bucketSizes.put(k, me.getValue()); - } - } - } - - if (retry.overMaximum()) { - checkReadiness(); - PRHARedundancyProvider.timedOut(this, null, null, "calculate size", retry.getRetryTime()); - } - - // throttle subsequent attempts - retry.waitForBucketsRecovery(); - } - } - /** * This method gets a PartitionServerSocketConnection to targetNode and sends * size request to the node. It returns size of all the buckets "primarily" @@ -7607,9 +7491,7 @@ public class PartitionedRegion extends LocalRegion implements .append("; isClosed=").append(this.isClosed) .append("; retryTimeout=").append(this.retryTimeout) .append("; serialNumber=").append(getSerialNumber()) - .append("; hdfsStoreName=").append(getHDFSStoreName()) - .append("; hdfsWriteOnly=").append(getHDFSWriteOnly()) - + .append("; partition attributes=").append(getPartitionAttributes().toString()) .append("; on VM ").append(getMyId()) .append("]") @@ -7752,18 +7634,6 @@ public class PartitionedRegion extends LocalRegion implements @Override public void destroyRegion(Object aCallbackArgument) throws CacheWriterException, TimeoutException { - //For HDFS regions, we need a data store - //to do the global destroy so that it can delete - //the data from HDFS as well. - if(!isDataStore() && this.dataPolicy.withHDFS()) { - if(destroyOnDataStore(aCallbackArgument)) { - //If we were able to find a data store to do the destroy, - //stop here. - //otherwise go ahead and destroy the region from this member - return; - } - } - checkForColocatedChildren(); getDataView().checkSupportsRegionDestroy(); checkForLimitedOrNoAccess(); @@ -7811,7 +7681,6 @@ public class PartitionedRegion extends LocalRegion implements boolean keepWaiting = true; - AsyncEventQueueImpl hdfsQueue = getHDFSEventQueue(); while(true) { List pausedSenders = new ArrayList(); List parallelQueues = new ArrayList(); @@ -7929,11 +7798,6 @@ public class PartitionedRegion extends LocalRegion implements } } } - - if(hdfsQueue != null) { - hdfsQueue.destroy(); - cache.removeAsyncEventQueue(hdfsQueue); - } } @Override @@ -8114,9 +7978,6 @@ public class PartitionedRegion extends LocalRegion implements final boolean isClose = event.getOperation().isClose(); destroyPartitionedRegionLocally(!isClose); destroyCleanUp(event, serials); - if(!isClose) { - destroyHDFSData(); - } return true; } @@ -8409,8 +8270,6 @@ public class PartitionedRegion extends LocalRegion implements } } - HDFSRegionDirector.getInstance().clear(getFullPath()); - RegionLogger.logDestroy(getName(), cache.getMyId(), null, op.isClose()); } @@ -11055,11 +10914,6 @@ public class PartitionedRegion extends LocalRegion implements } } - //hoplogs - pause HDFS dispatcher while we - //clear the buckets to avoid missing some files - //during the clear - pauseHDFSDispatcher(); - try { // now clear the bucket regions; we go through the primary bucket // regions so there is distribution for every bucket but that @@ -11075,7 +10929,6 @@ public class PartitionedRegion extends LocalRegion implements } } } finally { - resumeHDFSDispatcher(); // release the bucket locks for (BucketRegion br : lockedRegions) { try { @@ -11091,247 +10944,6 @@ public class PartitionedRegion extends LocalRegion implements } } - - /**Destroy all data in HDFS, if this region is using HDFS persistence.*/ - private void destroyHDFSData() { - if(getHDFSStoreName() == null) { - return; - } - - try { - hdfsManager.destroyData(); - } catch (IOException e) { - logger.warn(LocalizedStrings.HOPLOG_UNABLE_TO_DELETE_HDFS_DATA, e); - } - } - - private void pauseHDFSDispatcher() { - if(!isHDFSRegion()) { - return; - } - AbstractGatewaySenderEventProcessor eventProcessor = getHDFSEventProcessor(); - if (eventProcessor == null) return; - eventProcessor.pauseDispatching(); - eventProcessor.waitForDispatcherToPause(); - } - - /** - * Get the statistics for the HDFS event queue associated with this region, - * if any - */ - public AsyncEventQueueStats getHDFSEventQueueStats() { - AsyncEventQueueImpl asyncQ = getHDFSEventQueue(); - if(asyncQ == null) { - return null; - } - return asyncQ.getStatistics(); - } - - protected AbstractGatewaySenderEventProcessor getHDFSEventProcessor() { - final AsyncEventQueueImpl asyncQ = getHDFSEventQueue(); - final AbstractGatewaySender gatewaySender = (AbstractGatewaySender)asyncQ.getSender(); - AbstractGatewaySenderEventProcessor eventProcessor = gatewaySender.getEventProcessor(); - return eventProcessor; - } - - public AsyncEventQueueImpl getHDFSEventQueue() { - String asyncQId = getHDFSEventQueueName(); - if(asyncQId == null) { - return null; - } - final AsyncEventQueueImpl asyncQ = (AsyncEventQueueImpl)this.getCache().getAsyncEventQueue(asyncQId); - return asyncQ; - } - - private void resumeHDFSDispatcher() { - if(!isHDFSRegion()) { - return; - } - AbstractGatewaySenderEventProcessor eventProcessor = getHDFSEventProcessor(); - if (eventProcessor == null) return; - eventProcessor.resumeDispatching(); - } - - protected String getHDFSEventQueueName() { - if (!this.getDataPolicy().withHDFS()) return null; - String colocatedWith = this.getPartitionAttributes().getColocatedWith(); - String eventQueueName; - if (colocatedWith != null) { - PartitionedRegion leader = ColocationHelper.getLeaderRegionName(this); - eventQueueName = HDFSStoreFactoryImpl.getEventQueueName(leader - .getFullPath()); - } - else { - eventQueueName = HDFSStoreFactoryImpl.getEventQueueName(getFullPath()); - } - return eventQueueName; - } - - /** - * schedules compaction on all members where this region is hosted. - * - * @param isMajor - * true for major compaction - * @param maxWaitTime - * time to wait for the operation to complete, 0 will wait forever - */ - @Override - public void forceHDFSCompaction(boolean isMajor, Integer maxWaitTime) { - if (!this.isHDFSReadWriteRegion()) { - if (this.isHDFSRegion()) { - throw new UnsupportedOperationException( - LocalizedStrings.HOPLOG_CONFIGURED_AS_WRITEONLY - .toLocalizedString(getName())); - } - throw new UnsupportedOperationException( - LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE - .toLocalizedString(getName())); - } - // send request to remote data stores - long start = System.currentTimeMillis(); - int waitTime = maxWaitTime * 1000; - HDFSForceCompactionArgs args = new HDFSForceCompactionArgs(getRegionAdvisor().getBucketSet(), isMajor, waitTime); - HDFSForceCompactionResultCollector rc = new HDFSForceCompactionResultCollector(); - AbstractExecution execution = (AbstractExecution) FunctionService.onRegion(this).withArgs(args).withCollector(rc); - execution.setWaitOnExceptionFlag(true); // wait for all exceptions - if (logger.isDebugEnabled()) { - logger.debug("HDFS: ForceCompat invoking function with arguments "+args); - } - execution.execute(HDFSForceCompactionFunction.ID); - List result = rc.getResult(); - Set successfulBuckets = rc.getSuccessfulBucketIds(); - if (rc.shouldRetry()) { - int retries = 0; - while (retries < HDFSForceCompactionFunction.FORCE_COMPACTION_MAX_RETRIES) { - waitTime -= System.currentTimeMillis() - start; - if (maxWaitTime > 0 && waitTime < 0) { - break; - } - start = System.currentTimeMillis(); - retries++; - Set retryBuckets = new HashSet(getRegionAdvisor().getBucketSet()); - retryBuckets.removeAll(successfulBuckets); - - for (int bucketId : retryBuckets) { - getNodeForBucketWrite(bucketId, new PartitionedRegion.RetryTimeKeeper(waitTime)); - long now = System.currentTimeMillis(); - waitTime -= now - start; - start = now; - } - - args = new HDFSForceCompactionArgs(retryBuckets, isMajor, waitTime); - rc = new HDFSForceCompactionResultCollector(); - execution = (AbstractExecution) FunctionService.onRegion(this).withArgs(args).withCollector(rc); - execution.setWaitOnExceptionFlag(true); // wait for all exceptions - if (logger.isDebugEnabled()) { - logger.debug("HDFS: ForceCompat re-invoking function with arguments "+args+" filter:"+retryBuckets); - } - execution.execute(HDFSForceCompactionFunction.ID); - result = rc.getResult(); - successfulBuckets.addAll(rc.getSuccessfulBucketIds()); - } - } - if (successfulBuckets.size() != getRegionAdvisor().getBucketSet().size()) { - checkReadiness(); - Set uncessfulBuckets = new HashSet(getRegionAdvisor().getBucketSet()); - uncessfulBuckets.removeAll(successfulBuckets); - throw new FunctionException("Could not run compaction on following buckets:"+uncessfulBuckets); - } - } - - /** - * Schedules compaction on local buckets - * @param buckets the set of buckets to compact - * @param isMajor true for major compaction - * @param time TODO use this - * @return a list of futures for the scheduled compaction tasks - */ - public List> forceLocalHDFSCompaction(Set buckets, boolean isMajor, long time) { - List> futures = new ArrayList>(); - if (!isDataStore() || hdfsManager == null || buckets == null || buckets.isEmpty()) { - if (logger.isDebugEnabled()) { - logger.debug( - "HDFS: did not schedule local " + (isMajor ? "Major" : "Minor") + " compaction"); - } - // nothing to do - return futures; - } - if (logger.isDebugEnabled()) { - logger.debug( - "HDFS: scheduling local " + (isMajor ? "Major" : "Minor") + " compaction for buckets:"+buckets); - } - Collection organizers = hdfsManager.getBucketOrganizers(buckets); - - for (HoplogOrganizer hoplogOrganizer : organizers) { - Future f = hoplogOrganizer.forceCompaction(isMajor); - futures.add(f); - } - return futures; - } - - @Override - public void flushHDFSQueue(int maxWaitTime) { - if (!this.isHDFSRegion()) { - throw new UnsupportedOperationException( - LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE - .toLocalizedString(getName())); - } - HDFSFlushQueueFunction.flushQueue(this, maxWaitTime); - } - - @Override - public long lastMajorHDFSCompaction() { - if (!this.isHDFSReadWriteRegion()) { - if (this.isHDFSRegion()) { - throw new UnsupportedOperationException( - LocalizedStrings.HOPLOG_CONFIGURED_AS_WRITEONLY - .toLocalizedString(getName())); - } - throw new UnsupportedOperationException( - LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE - .toLocalizedString(getName())); - } - List result = (List) FunctionService.onRegion(this) - .execute(HDFSLastCompactionTimeFunction.ID) - .getResult(); - if (logger.isDebugEnabled()) { - logger.debug("HDFS: Result of LastCompactionTimeFunction "+result); - } - long min = Long.MAX_VALUE; - for (long ts : result) { - if (ts !=0 && ts < min) { - min = ts; - } - } - min = min == Long.MAX_VALUE ? 0 : min; - return min; - } - - public long lastLocalMajorHDFSCompaction() { - if (!isDataStore() || hdfsManager == null) { - // nothing to do - return 0; - } - if (logger.isDebugEnabled()) { - logger.debug( - "HDFS: getting local Major compaction time"); - } - Collection organizers = hdfsManager.getBucketOrganizers(); - long minTS = Long.MAX_VALUE; - for (HoplogOrganizer hoplogOrganizer : organizers) { - long ts = hoplogOrganizer.getLastMajorCompactionTimestamp(); - if (ts !=0 && ts < minTS) { - minTS = ts; - } - } - minTS = minTS == Long.MAX_VALUE ? 0 : minTS; - if (logger.isDebugEnabled()) { - logger.debug( - "HDFS: local Major compaction time: "+minTS); - } - return minTS; - } - public void shadowPRWaitForBucketRecovery() { assert this.isShadowPR(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java index 57b1e71..bda68e3 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java @@ -64,7 +64,6 @@ import com.gemstone.gemfire.cache.execute.Function; import com.gemstone.gemfire.cache.execute.FunctionException; import com.gemstone.gemfire.cache.execute.ResultSender; import com.gemstone.gemfire.cache.query.QueryInvalidException; -import com.gemstone.gemfire.cache.hdfs.HDFSIOException; import com.gemstone.gemfire.cache.query.internal.IndexUpdater; import com.gemstone.gemfire.cache.query.internal.QCompiler; import com.gemstone.gemfire.cache.query.internal.index.IndexCreationData; @@ -2059,13 +2058,13 @@ public class PartitionedRegionDataStore implements HasCachePerfStats ForceReattemptException, PRLocallyDestroyedException { return getLocally(bucketId, key,aCallbackArgument, disableCopyOnRead, preferCD, requestingClient, - clientEvent, returnTombstones, false, false); + clientEvent, returnTombstones, false); } /** * Returns value corresponding to this key. * @param key * the key to look for - * @param preferCD + * @param preferCD * @param requestingClient the client making the request, or null * @param clientEvent client's event (for returning version tag) * @param returnTombstones whether tombstones should be returned @@ -2076,21 +2075,28 @@ public class PartitionedRegionDataStore implements HasCachePerfStats * @throws PrimaryBucketException if the locally managed bucket is not primary * @throws PRLocallyDestroyedException if the PartitionRegion is locally destroyed */ - public Object getLocally(int bucketId, final Object key, - final Object aCallbackArgument, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, - boolean returnTombstones, boolean opScopeIsLocal, boolean allowReadFromHDFS) throws PrimaryBucketException, + public Object getLocally(int bucketId, + final Object key, + final Object aCallbackArgument, + boolean disableCopyOnRead, + boolean preferCD, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones, + boolean opScopeIsLocal) throws PrimaryBucketException, ForceReattemptException, PRLocallyDestroyedException { final BucketRegion bucketRegion = getInitializedBucketForId(key, Integer.valueOf(bucketId)); // check for primary (when a loader is present) done deeper in the BucketRegion Object ret=null; if (logger.isDebugEnabled()) { - logger.debug("getLocally: key {}) bucketId={}{}{} region {} returnTombstones {} allowReadFromHDFS {}", key, - this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, bucketRegion.getName(), returnTombstones, allowReadFromHDFS); + logger.debug("getLocally: key {}) bucketId={}{}{} region {} returnTombstones {} ", key, + this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, bucketRegion.getName(), returnTombstones); } invokeBucketReadHook(); try { - ret = bucketRegion.get(key, aCallbackArgument, true, disableCopyOnRead , preferCD, requestingClient, clientEvent, returnTombstones, opScopeIsLocal, allowReadFromHDFS, false); + ret = bucketRegion.get(key, aCallbackArgument, true, disableCopyOnRead , preferCD, requestingClient, clientEvent, returnTombstones, opScopeIsLocal, + false); checkIfBucketMoved(bucketRegion); } catch (RegionDestroyedException rde) { @@ -2122,7 +2128,11 @@ public class PartitionedRegionDataStore implements HasCachePerfStats * @throws PrimaryBucketException if the locally managed bucket is not primary * @see #getLocally(int, Object, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean) */ - public RawValue getSerializedLocally(KeyInfo keyInfo, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws PrimaryBucketException, + public RawValue getSerializedLocally(KeyInfo keyInfo, + boolean doNotLockEntry, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) throws PrimaryBucketException, ForceReattemptException { final BucketRegion bucketRegion = getInitializedBucketForId(keyInfo.getKey(), keyInfo.getBucketId()); // check for primary (when loader is present) done deeper in the BucketRegion @@ -2133,7 +2143,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats invokeBucketReadHook(); try { - RawValue result = bucketRegion.getSerialized(keyInfo, true, doNotLockEntry, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS); + RawValue result = bucketRegion.getSerialized(keyInfo, true, doNotLockEntry, requestingClient, clientEvent, returnTombstones); checkIfBucketMoved(bucketRegion); return result; } catch (RegionDestroyedException rde) { @@ -2157,7 +2167,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats * @param access * true if caller wants last accessed time updated * @param allowTombstones whether a tombstoned entry can be returned - * + * * @throws ForceReattemptException * if bucket region is not present in this process * @return a RegionEntry for the given key, which will be null if the key is @@ -2168,7 +2178,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats * if the PartitionRegion is locally destroyed */ public EntrySnapshot getEntryLocally(int bucketId, final Object key, - boolean access, boolean allowTombstones, boolean allowReadFromHDFS) + boolean access, boolean allowTombstones) throws EntryNotFoundException, PrimaryBucketException, ForceReattemptException, PRLocallyDestroyedException { @@ -2181,12 +2191,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats EntrySnapshot res = null; RegionEntry ent = null; try { - if (allowReadFromHDFS) { - ent = bucketRegion.entries.getEntry(key); - } - else { - ent = bucketRegion.entries.getOperationalEntryInVM(key); - } + ent = bucketRegion.entries.getEntry(key); if (ent == null) { this.getPartitionedRegion().checkReadiness(); @@ -2296,14 +2301,8 @@ public class PartitionedRegionDataStore implements HasCachePerfStats try{ if (r != null) { Set keys = r.keySet(allowTombstones); - if (getPartitionedRegion().isHDFSReadWriteRegion()) { - // hdfs regions can't copy all keys into memory - ret = keys; - - } else { // A copy is made so that the bucket is free to move ret = new HashSet(r.keySet(allowTombstones)); - } checkIfBucketMoved(r); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java index f083268..de1f7d8 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java @@ -65,12 +65,19 @@ public class PartitionedRegionDataView extends LocalRegionDataView { } @Override - public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate, - boolean generateCallbacks, Object value, boolean disableCopyOnRead, - boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) { + public Object findObject(KeyInfo key, + LocalRegion r, + boolean isCreate, + boolean generateCallbacks, + Object value, + boolean disableCopyOnRead, + boolean preferCD, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) { TXStateProxy tx = r.cache.getTXMgr().internalSuspend(); try { - return r.findObjectInSystem(key, isCreate, tx, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS); + return r.findObjectInSystem(key, isCreate, tx, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones); } finally { r.cache.getTXMgr().resume(tx); } @@ -82,10 +89,14 @@ public class PartitionedRegionDataView extends LocalRegionDataView { return pr.nonTXContainsKey(keyInfo); } @Override - public Object getSerializedValue(LocalRegion localRegion, KeyInfo keyInfo, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, - EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException { + public Object getSerializedValue(LocalRegion localRegion, + KeyInfo keyInfo, + boolean doNotLockEntry, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) throws DataLocationException { PartitionedRegion pr = (PartitionedRegion)localRegion; - return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS); + return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, requestingClient, clientEvent, returnTombstones); } @Override public boolean putEntryOnRemote(EntryEventImpl event, boolean ifNew, @@ -118,7 +129,7 @@ public class PartitionedRegionDataView extends LocalRegionDataView { boolean allowTombstones) throws DataLocationException { PartitionedRegion pr = (PartitionedRegion)localRegion; return pr.getDataStore().getEntryLocally(keyInfo.getBucketId(), - keyInfo.getKey(), false, allowTombstones, true); + keyInfo.getKey(), false, allowTombstones); } @Override http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java index f0a6543..74c134b 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java @@ -626,27 +626,6 @@ final class ProxyRegionMap implements RegionMap { } @Override - public boolean isMarkedForEviction() { - throw new UnsupportedOperationException(LocalizedStrings - .ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0 - .toLocalizedString(DataPolicy.EMPTY)); - } - - @Override - public void setMarkedForEviction() { - throw new UnsupportedOperationException(LocalizedStrings - .ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0 - .toLocalizedString(DataPolicy.EMPTY)); - } - - @Override - public void clearMarkedForEviction() { - throw new UnsupportedOperationException(LocalizedStrings - .ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0 - .toLocalizedString(DataPolicy.EMPTY)); - } - - @Override public boolean isValueNull() { throw new UnsupportedOperationException(LocalizedStrings.ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0.toLocalizedString(DataPolicy.EMPTY)); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java index 5838ead..bedbf81 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java @@ -35,7 +35,6 @@ import com.gemstone.gemfire.internal.offheap.StoredObject; import com.gemstone.gemfire.internal.offheap.annotations.Released; import com.gemstone.gemfire.internal.offheap.annotations.Retained; import com.gemstone.gemfire.internal.offheap.annotations.Unretained; -import com.gemstone.gemfire.cache.EvictionCriteria; /** * Internal interface for a region entry. @@ -415,25 +414,6 @@ public interface RegionEntry { public void setUpdateInProgress(final boolean underUpdate); /** - * Returns true if this entry has been marked for eviction for custom eviction - * via {@link EvictionCriteria}. - */ - public boolean isMarkedForEviction(); - - /** - * Marks this entry for eviction by custom eviction via - * {@link EvictionCriteria}. - */ - public void setMarkedForEviction(); - - /** - * Clears this entry as for eviction by custom eviction via - * {@link EvictionCriteria} or when an update is done after it was marked for - * eviction. - */ - public void clearMarkedForEviction(); - - /** * Event containing this RegionEntry is being passed through * dispatchListenerEvent for CacheListeners under RegionEntry lock. This is * used during deserialization for a VMCacheSerializable value contained by http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java index 2a7f0c4..7a97408 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java @@ -39,12 +39,6 @@ class RegionMapFactory { //.getDataPolicy().withPartitioning()); if (owner.isProxy() /*|| owner instanceof PartitionedRegion*/) { // TODO enabling this causes eviction tests to fail return new ProxyRegionMap(owner, attrs, internalRegionArgs); - } else if (internalRegionArgs.isReadWriteHDFSRegion()) { - if (owner.getEvictionController() == null) { - return new HDFSRegionMapImpl(owner, attrs, internalRegionArgs); - } - return new HDFSLRURegionMap(owner, attrs, internalRegionArgs); - //else if (owner.getEvictionController() != null && isNotPartitionedRegion) { } else if (owner.getEvictionController() != null ) { return new VMLRURegionMap(owner, attrs,internalRegionArgs); } else { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java index c754339..b565a2c 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java @@ -122,7 +122,7 @@ public final class RemoteGetMessage extends RemoteOperationMessageWithDirectRepl ((KeyWithRegionContext)this.key).setRegionContext(r); } KeyInfo keyInfo = r.getKeyInfo(key, cbArg); - val = r.getDataView().getSerializedValue(r, keyInfo, false, this.context, null, false, false/*for replicate regions*/); + val = r.getDataView().getSerializedValue(r, keyInfo, false, this.context, null, false /*for replicate regions*/); valueBytes = val instanceof RawValue ? (RawValue)val : new RawValue(val); if (logger.isTraceEnabled(LogMarker.DM)) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java index 983f928..2906ff6 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java @@ -113,7 +113,8 @@ public class TXEntry implements Region.Entry { checkTX(); // Object value = this.localRegion.getDeserialized(this.key, false, this.myTX, this.rememberReads); - @Unretained Object value = this.myTX.getDeserializedValue(keyInfo, this.localRegion, false, false, false, null, false, false, false); + @Unretained Object value = this.myTX.getDeserializedValue(keyInfo, this.localRegion, false, false, false, null, false, + false); if (value == null) { throw new EntryDestroyedException(this.keyInfo.getKey().toString()); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java index a67d3cc..617873c 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java @@ -1407,7 +1407,14 @@ public class TXState implements TXStateInterface { /* (non-Javadoc) * @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean) */ - public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) { + public Object getDeserializedValue(KeyInfo keyInfo, + LocalRegion localRegion, + boolean updateStats, + boolean disableCopyOnRead, + boolean preferCD, + EntryEventImpl clientEvent, + boolean returnTombstones, + boolean retainResult) { TXEntryState tx = txReadEntry(keyInfo, localRegion, true, true/*create txEntry is absent*/); if (tx != null) { Object v = tx.getValue(keyInfo, localRegion, preferCD); @@ -1416,7 +1423,8 @@ public class TXState implements TXStateInterface { } return v; } else { - return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadFromHDFS, retainResult); + return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones, + retainResult); } } @@ -1425,15 +1433,19 @@ public class TXState implements TXStateInterface { * @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object) */ @Retained - public Object getSerializedValue(LocalRegion localRegion, KeyInfo keyInfo, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, - boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException { + public Object getSerializedValue(LocalRegion localRegion, + KeyInfo keyInfo, + boolean doNotLockEntry, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) throws DataLocationException { final Object key = keyInfo.getKey(); TXEntryState tx = txReadEntry(keyInfo, localRegion, true,true/*create txEntry is absent*/); if (tx != null) { Object val = tx.getPendingValue(); if(val==null || Token.isInvalidOrRemoved(val)) { val = findObject(keyInfo,localRegion, val!=Token.INVALID, - true, val, false, false, requestingClient, clientEvent, false, allowReadFromHDFS); + true, val, false, false, requestingClient, clientEvent, false); } return val; } else { @@ -1441,7 +1453,7 @@ public class TXState implements TXStateInterface { // so we should never come here assert localRegion instanceof PartitionedRegion; PartitionedRegion pr = (PartitionedRegion)localRegion; - return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, null, null, returnTombstones, allowReadFromHDFS); + return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, null, null, returnTombstones); } } @@ -1519,9 +1531,17 @@ public class TXState implements TXStateInterface { /* (non-Javadoc) * @see com.gemstone.gemfire.internal.cache.TXStateInterface#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object) */ - public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate, - boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) { - return r.findObjectInSystem(key, isCreate, this, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS); + public Object findObject(KeyInfo key, + LocalRegion r, + boolean isCreate, + boolean generateCallbacks, + Object value, + boolean disableCopyOnRead, + boolean preferCD, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) { + return r.findObjectInSystem(key, isCreate, this, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones); } private boolean readEntryAndCheckIfDestroyed(KeyInfo keyInfo, LocalRegion localRegion, http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java index 5da20d8..3fa9351 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java @@ -123,8 +123,14 @@ public interface TXStateInterface extends Synchronization, InternalDataView { * @param localRegion * @param updateStats TODO */ - public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, - boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadsFromHDFS, boolean retainResult); + public Object getDeserializedValue(KeyInfo keyInfo, + LocalRegion localRegion, + boolean updateStats, + boolean disableCopyOnRead, + boolean preferCD, + EntryEventImpl clientEvent, + boolean returnTombstones, + boolean retainResult); public TXEvent getEvent(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java index e66302e..0939ab0 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java @@ -341,9 +341,16 @@ public class TXStateProxyImpl implements TXStateProxy { /* (non-Javadoc) * @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean) */ - public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, - boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) { - Object val = getRealDeal(keyInfo, localRegion).getDeserializedValue(keyInfo, localRegion, updateStats, disableCopyOnRead, preferCD, null, false, allowReadFromHDFS, retainResult); + public Object getDeserializedValue(KeyInfo keyInfo, + LocalRegion localRegion, + boolean updateStats, + boolean disableCopyOnRead, + boolean preferCD, + EntryEventImpl clientEvent, + boolean returnTombstones, + boolean retainResult) { + Object val = getRealDeal(keyInfo, localRegion).getDeserializedValue(keyInfo, localRegion, updateStats, disableCopyOnRead, preferCD, null, false, + retainResult); if (val != null) { // fixes bug 51057: TXStateStub on client always returns null, so do not increment // the operation count it will be incremented in findObject() @@ -599,13 +606,13 @@ public class TXStateProxyImpl implements TXStateProxy { * @see com.gemstone.gemfire.internal.cache.InternalDataView#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object) */ public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate, - boolean generateCallbacks, Object value, boolean disableCopyOnRead, - boolean preferCD, ClientProxyMembershipID requestingClient, - EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) { + boolean generateCallbacks, Object value, boolean disableCopyOnRead, + boolean preferCD, ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, boolean returnTombstones) { try { this.operationCount++; Object retVal = getRealDeal(key, r).findObject(key, r, isCreate, generateCallbacks, - value, disableCopyOnRead, preferCD, requestingClient, clientEvent, false, allowReadFromHDFS); + value, disableCopyOnRead, preferCD, requestingClient, clientEvent, false); trackBucketForTx(key); return retVal; } catch (TransactionDataRebalancedException | PrimaryBucketException re) { @@ -720,9 +727,14 @@ public class TXStateProxyImpl implements TXStateProxy { * (non-Javadoc) * @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object) */ - public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException { + public Object getSerializedValue(LocalRegion localRegion, + KeyInfo key, + boolean doNotLockEntry, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) throws DataLocationException { this.operationCount++; - return getRealDeal(key, localRegion).getSerializedValue(localRegion, key, doNotLockEntry, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS); + return getRealDeal(key, localRegion).getSerializedValue(localRegion, key, doNotLockEntry, requestingClient, clientEvent, returnTombstones); } /* (non-Javadoc) http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java index ac35425..0b226e0 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java @@ -184,8 +184,14 @@ public abstract class TXStateStub implements TXStateInterface { /* (non-Javadoc) * @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean) */ - public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, - boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) { + public Object getDeserializedValue(KeyInfo keyInfo, + LocalRegion localRegion, + boolean updateStats, + boolean disableCopyOnRead, + boolean preferCD, + EntryEventImpl clientEvent, + boolean returnTombstones, + boolean retainResult) { // We never have a local value if we are a stub... return null; } @@ -373,10 +379,17 @@ public abstract class TXStateStub implements TXStateInterface { /* (non-Javadoc) * @see com.gemstone.gemfire.internal.cache.InternalDataView#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object) */ - public Object findObject(KeyInfo keyInfo, LocalRegion r, boolean isCreate, - boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, - EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) { - return getTXRegionStub(r).findObject(keyInfo,isCreate,generateCallbacks,value, preferCD, requestingClient, clientEvent, allowReadFromHDFS); + public Object findObject(KeyInfo keyInfo, + LocalRegion r, + boolean isCreate, + boolean generateCallbacks, + Object value, + boolean disableCopyOnRead, + boolean preferCD, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) { + return getTXRegionStub(r).findObject(keyInfo,isCreate,generateCallbacks,value, preferCD, requestingClient, clientEvent); } /* (non-Javadoc) @@ -432,7 +445,12 @@ public abstract class TXStateStub implements TXStateInterface { * (non-Javadoc) * @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object) */ - public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) { + public Object getSerializedValue(LocalRegion localRegion, + KeyInfo key, + boolean doNotLockEntry, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java index a17650c..269f891 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java @@ -114,10 +114,6 @@ public abstract class UserSpecifiedRegionAttributes implements RegionAttrib */ private boolean hasCloningEnabled = false; - private boolean hasHDFSStoreName = false; - - private boolean hasHDFSWriteOnly = false; - /** * Whether this region has entry value compression. * @@ -526,7 +522,7 @@ public abstract class UserSpecifiedRegionAttributes implements RegionAttrib { this.hasDiskSynchronous = val; } - private static final int HAS_COUNT = 43; + private static final int HAS_COUNT = 41; public void initHasFields(UserSpecifiedRegionAttributes other) { @@ -602,22 +598,4 @@ public abstract class UserSpecifiedRegionAttributes implements RegionAttrib public List getIndexes() { return this.indexes; } - - public boolean hasHDFSStoreName() - { - return this.hasHDFSStoreName; - } - public void setHasHDFSStoreName(boolean val) - { - this.hasHDFSStoreName = val; - } - - public void setHasHDFSWriteOnly(boolean val) - { - this.hasHDFSWriteOnly = val; - } - public boolean hasHDFSWriteOnly() - { - return this.hasHDFSWriteOnly; - } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java index f587e39..54133cc 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java @@ -408,19 +408,6 @@ public class ValidatingDiskRegion extends DiskRegion implements DiskRecoveryStor // TODO Auto-generated method stub } @Override - public boolean isMarkedForEviction() { - // TODO Auto-generated method stub - return false; - } - @Override - public void setMarkedForEviction() { - // TODO Auto-generated method stub - } - @Override - public void clearMarkedForEviction() { - // TODO Auto-generated method stub - } - @Override public boolean isInvalid() { // TODO Auto-generated method stub return false; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java index d3078a9..ea47e91 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java @@ -299,7 +299,7 @@ public final class FetchBulkEntriesMessage extends PartitionMessage Object key = it.next(); VersionTagHolder clientEvent = new VersionTagHolder(); Object value = map.get(key, null, true, true, true, null, - clientEvent, allowTombstones, false); + clientEvent, allowTombstones); if (needToWriteBucketInfo) { DataSerializer.writePrimitiveInt(map.getId(), mos); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java index d7e50f1..3fef790 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java @@ -93,11 +93,9 @@ public final class GetMessage extends PartitionMessageWithDirectReply private boolean returnTombstones; - private boolean allowReadFromHDFS; // reuse some flags protected static final int HAS_LOADER = NOTIFICATION_ONLY; protected static final int CAN_START_TX = IF_NEW; - protected static final int READ_FROM_HDFS = IF_OLD; /** * Empty constructor to satisfy {@link DataSerializer} requirements @@ -106,15 +104,14 @@ public final class GetMessage extends PartitionMessageWithDirectReply } private GetMessage(InternalDistributedMember recipient, int regionId, - DirectReplyProcessor processor, - final Object key, final Object aCallbackArgument, ClientProxyMembershipID context, - boolean returnTombstones, boolean allowReadFromHDFS) { + DirectReplyProcessor processor, + final Object key, final Object aCallbackArgument, ClientProxyMembershipID context, + boolean returnTombstones) { super(recipient, regionId, processor); this.key = key; this.cbArg = aCallbackArgument; this.context = context; this.returnTombstones = returnTombstones; - this.allowReadFromHDFS = allowReadFromHDFS; } private static final boolean ORDER_PR_GETS = Boolean.getBoolean("gemfire.order-pr-gets"); @@ -191,7 +188,7 @@ public final class GetMessage extends PartitionMessageWithDirectReply KeyInfo keyInfo = r.getKeyInfo(key, cbArg); boolean lockEntry = forceUseOfPRExecutor || isDirectAck(); - val = r.getDataView().getSerializedValue(r, keyInfo, !lockEntry, this.context, event, returnTombstones, allowReadFromHDFS); + val = r.getDataView().getSerializedValue(r, keyInfo, !lockEntry, this.context, event, returnTombstones); if(val == BucketRegion.REQUIRES_ENTRY_LOCK) { Assert.assertTrue(!lockEntry); @@ -272,14 +269,12 @@ public final class GetMessage extends PartitionMessageWithDirectReply @Override protected short computeCompressedShort(short s) { s = super.computeCompressedShort(s); - if (this.allowReadFromHDFS) s |= READ_FROM_HDFS; return s; } @Override protected void setBooleans(short s, DataInput in) throws ClassNotFoundException, IOException { super.setBooleans(s, in); - if ((s & READ_FROM_HDFS) != 0) this.allowReadFromHDFS = true; } public void setKey(Object key) @@ -303,15 +298,18 @@ public final class GetMessage extends PartitionMessageWithDirectReply * @throws ForceReattemptException if the peer is no longer available */ public static GetResponse send(InternalDistributedMember recipient, - PartitionedRegion r, final Object key, final Object aCallbackArgument, - ClientProxyMembershipID requestingClient, boolean returnTombstones, boolean allowReadFromHDFS) + PartitionedRegion r, + final Object key, + final Object aCallbackArgument, + ClientProxyMembershipID requestingClient, + boolean returnTombstones) throws ForceReattemptException { Assert.assertTrue(recipient != null, "PRDistribuedGetReplyMessage NULL reply message"); GetResponse p = new GetResponse(r.getSystem(), Collections.singleton(recipient), key); GetMessage m = new GetMessage(recipient, r.getPRId(), p, - key, aCallbackArgument, requestingClient, returnTombstones, allowReadFromHDFS); + key, aCallbackArgument, requestingClient, returnTombstones); Set failures = r.getDistributionManager().putOutgoing(m); if (failures != null && failures.size() > 0) { throw new ForceReattemptException(LocalizedStrings.GetMessage_FAILED_SENDING_0.toLocalizedString(m)); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java index a88f96f..8aaf587 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java @@ -101,9 +101,8 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply protected static final short HAS_BRIDGE_CONTEXT = UNRESERVED_FLAGS_START; protected static final short SKIP_CALLBACKS = (HAS_BRIDGE_CONTEXT << 1); - protected static final short FETCH_FROM_HDFS = (SKIP_CALLBACKS << 1); //using the left most bit for IS_PUT_DML, the last available bit - protected static final short IS_PUT_DML = (short) (FETCH_FROM_HDFS << 1); + protected static final short IS_PUT_DML = (short) (SKIP_CALLBACKS << 1); private transient InternalDistributedSystem internalDs; @@ -118,9 +117,6 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply transient VersionedObjectList versions = null; - /** whether this operation should fetch oldValue from HDFS */ - private boolean fetchFromHDFS; - private boolean isPutDML; /** * Empty constructor to satisfy {@link DataSerializer}requirements @@ -129,7 +125,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply } public PutAllPRMessage(int bucketId, int size, boolean notificationOnly, - boolean posDup, boolean skipCallbacks, Object callbackArg, boolean fetchFromHDFS, boolean isPutDML) { + boolean posDup, boolean skipCallbacks, Object callbackArg, boolean isPutDML) { this.bucketId = Integer.valueOf(bucketId); putAllPRData = new PutAllEntryData[size]; this.notificationOnly = notificationOnly; @@ -137,8 +133,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply this.skipCallbacks = skipCallbacks; this.callbackArg = callbackArg; initTxMemberId(); - this.fetchFromHDFS = fetchFromHDFS; - this.isPutDML = isPutDML; + this.isPutDML = isPutDML; } public void addEntry(PutAllEntryData entry) { @@ -307,7 +302,6 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply s = super.computeCompressedShort(s); if (this.bridgeContext != null) s |= HAS_BRIDGE_CONTEXT; if (this.skipCallbacks) s |= SKIP_CALLBACKS; - if (this.fetchFromHDFS) s |= FETCH_FROM_HDFS; if (this.isPutDML) s |= IS_PUT_DML; return s; } @@ -317,7 +311,6 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply ClassNotFoundException { super.setBooleans(s, in); this.skipCallbacks = ((s & SKIP_CALLBACKS) != 0); - this.fetchFromHDFS = ((s & FETCH_FROM_HDFS) != 0); this.isPutDML = ((s & IS_PUT_DML) != 0); } @@ -495,9 +488,6 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply ev.setPutAllOperation(dpao); - // set the fetchFromHDFS flag - ev.setFetchFromHDFS(this.fetchFromHDFS); - // make sure a local update inserts a cache de-serializable ev.makeSerializedNewValue(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java index d5abaa1..a6a39dc 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java @@ -182,9 +182,6 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements private VersionTag versionTag; - /** whether this operation should fetch oldValue from HDFS*/ - private transient boolean fetchFromHDFS; - private transient boolean isPutDML; // additional bitmask flags used for serialization/deserialization @@ -208,7 +205,6 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements // masks there are taken // also switching the masks will impact backwards compatibility. Need to // verify if it is ok to break backwards compatibility - protected static final int FETCH_FROM_HDFS = getNextByteMask(HAS_CALLBACKARG); /* private byte[] oldValBytes; @@ -608,9 +604,6 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements this.originalSender = (InternalDistributedMember)DataSerializer .readObject(in); } - if ((extraFlags & FETCH_FROM_HDFS) != 0) { - this.fetchFromHDFS = true; - } this.eventId = new EventID(); InternalDataSerializer.invokeFromData(this.eventId, in); @@ -697,7 +690,6 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements extraFlags |= HAS_DELTA_WITH_FULL_VALUE; } if (this.originalSender != null) extraFlags |= HAS_ORIGINAL_SENDER; - if (this.event.isFetchFromHDFS()) extraFlags |= FETCH_FROM_HDFS; out.writeByte(extraFlags); DataSerializer.writeObject(getKey(), out); @@ -822,7 +814,6 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements ev.setCausedByMessage(this); ev.setInvokePRCallbacks(!notificationOnly); ev.setPossibleDuplicate(this.posDup); - ev.setFetchFromHDFS(this.fetchFromHDFS); ev.setPutDML(this.isPutDML); /*if (this.hasOldValue) { if (this.oldValueIsSerialized) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ByteComparator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ByteComparator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ByteComparator.java deleted file mode 100644 index 5c199ae..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ByteComparator.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import org.apache.hadoop.hbase.util.Bytes; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator; - -/** - * Compares objects byte-by-byte. This is fast and sufficient for cases when - * lexicographic ordering is not important or the serialization is order- - * preserving. - * - */ -public class ByteComparator implements SerializedComparator { - @Override - public int compare(byte[] rhs, byte[] lhs) { - return compare(rhs, 0, rhs.length, lhs, 0, lhs.length); - } - - @Override - public int compare(byte[] r, int rOff, int rLen, byte[] l, int lOff, int lLen) { - return compareBytes(r, rOff, rLen, l, lOff, lLen); - } - - /** - * Compares two byte arrays element-by-element. - * - * @param r the right array - * @param rOff the offset of r - * @param rLen the length of r to compare - * @param l the left array - * @param lOff the offset of l - * @param lLen the length of l to compare - * @return -1 if r < l; 0 if r == l; 1 if r > 1 - */ - - public static int compareBytes(byte[] r, int rOff, int rLen, byte[] l, int lOff, int lLen) { - return Bytes.compareTo(r, rOff, rLen, l, lOff, lLen); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CursorIterator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CursorIterator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CursorIterator.java deleted file mode 100644 index dacc208..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CursorIterator.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.util.Iterator; - -/** - * Provides an {@link Iterator} that allows access to the current iteration - * element. The implementor must provide access to the current element - * as well as a means to move to the next element. - * - * - * @param the element type - */ -public interface CursorIterator extends Iterator { - /** - * Returns the element at the current position. - * @return the current element - */ - E current(); - - /** - * Provides an iteration cursor by wrapping an {@link Iterator}. - * - * @param the element type - */ - public static class WrappedIterator implements CursorIterator { - /** the underlying iterator */ - private final Iterator src; - - /** the current iteration element */ - private E current; - - public WrappedIterator(Iterator src) { - this.src = src; - } - - @Override - public boolean hasNext() { - return src.hasNext(); - } - - @Override - public E next() { - current = src.next(); - return current; - } - - @Override - public E current() { - return current; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - /** - * Returns the unwrapped interator. - * @return the iterator - */ - public Iterator unwrap() { - return src; - } - } -}