Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B0561200AE3 for ; Thu, 5 May 2016 00:57:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AF2141609FC; Wed, 4 May 2016 22:57:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 53A2A160A02 for ; Thu, 5 May 2016 00:57:35 +0200 (CEST) Received: (qmail 63334 invoked by uid 500); 4 May 2016 22:57:34 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 63254 invoked by uid 99); 4 May 2016 22:57:34 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 May 2016 22:57:34 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id DFF091A5396 for ; Wed, 4 May 2016 22:57:33 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id cC4LFt9Z0WBu for ; Wed, 4 May 2016 22:57:07 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 0420E60D22 for ; Wed, 4 May 2016 22:57:02 +0000 (UTC) Received: (qmail 57914 invoked by uid 99); 4 May 2016 22:57:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 May 2016 22:57:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E51ADDF97F; Wed, 4 May 2016 22:57:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: klund@apache.org To: commits@geode.incubator.apache.org Date: Wed, 04 May 2016 22:57:05 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/63] [abbrv] incubator-geode git commit: GEODE-1072: Removing HDFS related code archived-at: Wed, 04 May 2016 22:57:37 -0000 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java index 3896800..c924be5 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java @@ -373,13 +373,20 @@ public final class HARegion extends DistributedRegion /** * @return the deserialized value - * @see DistributedRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean, boolean) + * @see LocalRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean) * */ @Override - protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate, - TXStateInterface txState, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, - boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) + protected Object findObjectInSystem(KeyInfo keyInfo, + boolean isCreate, + TXStateInterface txState, + boolean generateCallbacks, + Object localValue, + boolean disableCopyOnRead, + boolean preferCD, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) throws CacheLoaderException, TimeoutException { Object value = null; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java deleted file mode 100644 index f6c6aa7..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache; - -import java.util.Collection; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.cache.CacheWriterException; -import com.gemstone.gemfire.cache.TimeoutException; -import com.gemstone.gemfire.i18n.LogWriterI18n; -import com.gemstone.gemfire.internal.cache.lru.EnableLRU; -import com.gemstone.gemfire.internal.cache.lru.LRUEntry; -import com.gemstone.gemfire.internal.cache.lru.NewLRUClockHand; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; - -/** - * Implementation of RegionMap that reads data from HDFS and adds LRU behavior - * - */ -public class HDFSLRURegionMap extends AbstractLRURegionMap implements HDFSRegionMap { - - private static final Logger logger = LogService.getLogger(); - - private final HDFSRegionMapDelegate delegate; - - /** - * A tool from the eviction controller for sizing entries and - * expressing limits. - */ - private EnableLRU ccHelper; - - /** The list of nodes in LRU order */ - private NewLRUClockHand lruList; - - private static final boolean DEBUG = Boolean.getBoolean("hdfsRegionMap.DEBUG"); - - public HDFSLRURegionMap(LocalRegion owner, Attributes attrs, - InternalRegionArguments internalRegionArgs) { - super(internalRegionArgs); - assert owner instanceof BucketRegion; - initialize(owner, attrs, internalRegionArgs); - this.delegate = new HDFSRegionMapDelegate(owner, attrs, internalRegionArgs, this); - } - - @Override - public RegionEntry getEntry(Object key) { - return delegate.getEntry(key, null); - } - - @Override - protected RegionEntry getEntry(EntryEventImpl event) { - return delegate.getEntry(event); - } - - @Override - @SuppressWarnings("unchecked") - public Collection regionEntries() { - return delegate.regionEntries(); - } - - @Override - public int size() { - return delegate.size(); - } - - @Override - public boolean isEmpty() { - return delegate.isEmpty(); - } - - @Override - protected void _setCCHelper(EnableLRU ccHelper) { - this.ccHelper = ccHelper; - } - - @Override - protected EnableLRU _getCCHelper() { - return this.ccHelper; - } - - @Override - protected void _setLruList(NewLRUClockHand lruList) { - this.lruList = lruList; - } - - @Override - protected NewLRUClockHand _getLruList() { - return this.lruList; - } - - @Override - public HDFSRegionMapDelegate getDelegate() { - return this.delegate; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java deleted file mode 100644 index 2a7baef..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache; - -/** - * Interface implemented by RegionMap implementations that - * read from HDFS. - * - * - */ -public interface HDFSRegionMap { - - /** - * @return the {@link HDFSRegionMapDelegate} that does - * all the work - */ - public HDFSRegionMapDelegate getDelegate(); -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java deleted file mode 100644 index a2ef653..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java +++ /dev/null @@ -1,540 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache; - -import java.io.IOException; -import java.lang.ref.Reference; -import java.lang.ref.ReferenceQueue; -import java.util.Collection; -import java.util.Collections; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.cache.EvictionAction; -import com.gemstone.gemfire.cache.CustomEvictionAttributes; -import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; -import com.gemstone.gemfire.cache.hdfs.HDFSIOException; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSEntriesSet; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSEntriesSet.HDFSIterator; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue; -import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent; -import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; -import com.gemstone.gemfire.i18n.LogWriterI18n; -import com.gemstone.gemfire.internal.cache.LocalRegion.IteratorType; -import com.gemstone.gemfire.internal.cache.RegionMap.Attributes; -import com.gemstone.gemfire.internal.cache.lru.EnableLRU; -import com.gemstone.gemfire.internal.cache.lru.LRUEntry; -import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector; -import com.gemstone.gemfire.internal.cache.versions.VersionStamp; -import com.gemstone.gemfire.internal.cache.versions.VersionTag; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor; -import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; -import com.gemstone.gemfire.internal.util.concurrent.FutureResult; - -/** - * This class encapsulates all the functionality of HDFSRegionMap, so - * that it can be provided to HDFSLRURegionMap. - * - */ -public class HDFSRegionMapDelegate { - - private static final Logger logger = LogService.getLogger(); - - private final BucketRegion owner; - - private ConcurrentParallelGatewaySenderQueue hdfsQueue; - - private final RegionMap backingRM; - - /** queue of dead iterators */ - private final ReferenceQueue refs; - - private static final boolean DEBUG = Boolean.getBoolean("hdfsRegionMap.DEBUG"); - - /** - * used for serializing fetches from HDFS - */ - private ConcurrentMap futures = new ConcurrentHashMap(); - - public HDFSRegionMapDelegate(LocalRegion owner, Attributes attrs, - InternalRegionArguments internalRegionArgs, RegionMap backingRM) { - assert owner instanceof BucketRegion; - this.owner = (BucketRegion) owner; - this.backingRM = backingRM; - refs = new ReferenceQueue(); - } - - public RegionEntry getEntry(Object key, EntryEventImpl event) { - - RegionEntry re = getEntry(key, event, true); - // get from tx should put the entry back in map - // it should be evicted once tx completes - /**MergeGemXDHDFSToGFE txstate does not apply for this*/ - /* if (re != null && getTXState(event) != null) { - if (re != null) { - // put the region entry in backing CHM of AbstractRegionMap so that - // it can be locked in basicPut/destroy - RegionEntry oldRe = backingRM.putEntryIfAbsent(key, re); - if (oldRe != null) { - if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) { - ((OffHeapRegionEntry)re).release(); - } - return oldRe; - } - re.setMarkedForEviction(); - owner.updateSizeOnCreate(key, - owner.calculateRegionEntryValueSize(re)); - ((AbstractRegionMap)backingRM).incEntryCount(1); - ((AbstractRegionMap)backingRM).lruEntryCreate(re); - }*/ - return re; - } - - /* - private TXStateInterface getTXState(EntryEventImpl event) { - return event != null ? event.getTXState(this.owner) : this.owner - .getTXState(); - }*/ - - /** - * - * @param key - * @param event - * @param forceOnHeap if true will return heap version of off-heap region entries - */ - private RegionEntry getEntry(Object key, EntryEventImpl event, boolean forceOnHeap) { - closeDeadIterators(); - - RegionEntry re = backingRM.getEntryInVM(key); - if (logger.isTraceEnabled() || DEBUG) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: Found the key in CHM: " + key - + " ,value=" + (re == null? "null" : "[" + re._getValue() + " or (" + re.getValueAsToken() + ")]"))); - } - if ((re == null || (re.isRemoved() && !re.isTombstone())) - && owner.getBucketAdvisor().isPrimary() && allowReadFromHDFS()) { - if (logger.isTraceEnabled() || DEBUG) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: fetching from hdfs key:" + key)); - } - try { - this.owner.getPartitionedRegion().hdfsCalled(key); - re = getEntryFromFuture(key); - if (re != null) { - return re; - } - - assert this.owner.getPartitionedRegion().getDataPolicy() - .withHDFS(); - byte[] k = EntryEventImpl.serialize(key); - - // for destroy ops we will retain the entry in the region map so - // tombstones can be tracked - //final boolean forceOnHeap = (event==null || !event.getOperation().isDestroy()); - - // get from queue - re = getFromHDFSQueue(key, k, forceOnHeap); - if (re == null) { - // get from HDFS - re = getFromHDFS(key, k, forceOnHeap); - } - if (re != null && re.isTombstone()) { - RegionVersionVector vector = this.owner.getVersionVector(); -// if (vector == null) { -// this.owner.getLogWriterI18n().info(LocalizedStrings.DEBUG, -// "found a tombstone in a region w/o a version vector: " + re + "; region: " + this.owner); -// } - if (vector == null - || vector.isTombstoneTooOld(re.getVersionStamp().getMemberID(), - re.getVersionStamp().getRegionVersion())) { - re = null; - } - } - if (logger.isTraceEnabled() || DEBUG) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: returning from hdfs re:" + re)); - } - } catch (ForceReattemptException e) { - throw new PrimaryBucketException(e.getLocalizedMessage(), e); - } catch (IOException e) { - throw new HDFSIOException("Error reading from HDFS", e); - } finally { - notifyFuture(key, re); - // If we mark it here, the table scan may miss it causing updates/delete using table - // scan to fail. -// if (re != null) { -// re.setMarkedForEviction(); -// } - if(re != null && event != null && !re.isTombstone()) { - if (logger.isTraceEnabled() || DEBUG) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: loaded from hdfs re:" + re)); - } - BucketRegion br = (BucketRegion)owner; - //CustomEvictionAttributes csAttr = br.getCustomEvictionAttributes(); - //if(csAttr!=null) - event.setLoadedFromHDFS(true); - } - } - } - if(re!=null && re.isMarkedForEviction() && !re.isTombstone()) { - if(event!=null) { - event.setLoadedFromHDFS(true); - } - } - - return re; - } - - /** - * This method returns true if the RegionEntry should be read from HDFS. - * fixes #49101 by not allowing reads from HDFS for persistent regions - * that do not define an eviction criteria. - * - * @return true if RegionEntry should be read from HDFS - */ - private boolean allowReadFromHDFS() { - if (!owner.getDataPolicy().withPersistence() - || owner.getCustomEvictionAttributes() != null - || isEvictionActionLocalDestroy()){ - /**MergeGemXDHDFSToGFE this is used for global index. Hence not required here*/ - //|| owner.isUsedForIndex()) { - // when region does not have persistence, we have to read from HDFS (even - // though there is no eviction criteria) for constraint checks - return true; - } - return false; - } - - private boolean isEvictionActionLocalDestroy() { - PartitionedRegion pr = owner.getPartitionedRegion(); - if (pr.getEvictionAttributes() != null) { - return pr.getEvictionAttributes().getAction() == EvictionAction.LOCAL_DESTROY; - } - return false; - } - - protected RegionEntry getEntry(EntryEventImpl event) { - RegionEntry re = getEntry(event.getKey(), event, false); - if (re != null && event.isLoadedFromHDFS()) { - // put the region entry in backing CHM of AbstractRegionMap so that - // it can be locked in basicPut/destroy - RegionEntry oldRe = backingRM.putEntryIfAbsent(event.getKey(), re); - if (oldRe != null) { - if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) { - ((OffHeapRegionEntry) re).release(); - } - return oldRe; - } - // since the entry is faulted in from HDFS, it must have - // satisfied the eviction criteria in the past, so mark it for eviction - re.setMarkedForEviction(); - - owner.updateSizeOnCreate(event.getKey(), owner.calculateRegionEntryValueSize(re)); - ((AbstractRegionMap) backingRM).incEntryCount(1); - ((AbstractRegionMap) backingRM).lruEntryCreate(re); - } - return re; - } - - @SuppressWarnings("unchecked") - public Collection regionEntries() { - closeDeadIterators(); - if (!owner.getPartitionedRegion().includeHDFSResults()) { - if (logger.isDebugEnabled() || DEBUG) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Ignoring HDFS results for #regionEntries")); - } - return backingRM.regionEntriesInVM(); - } - - try { - return createEntriesSet(IteratorType.ENTRIES); - } catch (ForceReattemptException e) { - throw new PrimaryBucketException(e.getLocalizedMessage(), e); - } - } - - public int size() { - closeDeadIterators(); - if (!owner.getPartitionedRegion().includeHDFSResults()) { - if (logger.isDebugEnabled() || DEBUG) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Ignoring HDFS results for #size")); - } - return backingRM.sizeInVM(); - } - - try { - return createEntriesSet(IteratorType.KEYS).size(); - } catch (ForceReattemptException e) { - throw new PrimaryBucketException(e.getLocalizedMessage(), e); - } - } - - public boolean isEmpty() { - closeDeadIterators(); - if (!owner.getPartitionedRegion().includeHDFSResults()) { - if (logger.isDebugEnabled() || DEBUG) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Ignoring HDFS results for #isEmpty")); - } - return backingRM.sizeInVM() == 0; - } - - try { - return createEntriesSet(IteratorType.KEYS).isEmpty(); - } catch (ForceReattemptException e) { - throw new PrimaryBucketException(e.getLocalizedMessage(), e); - } - } - - private void notifyFuture(Object key, RegionEntry re) { - FutureResult future = this.futures.remove(key); - if (future != null) { - future.set(re); - } - } - - private RegionEntry getEntryFromFuture(Object key) { - FutureResult future = new FutureResult(this.owner.getCancelCriterion()); - FutureResult old = this.futures.putIfAbsent(key, future); - if (old != null) { - if (logger.isTraceEnabled() || DEBUG) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: waiting for concurrent fetch to complete for key:" + key)); - } - try { - return (RegionEntry) old.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - this.owner.getCache().getCancelCriterion().checkCancelInProgress(null); - } - } - return null; - } - - private RegionEntry getFromHDFS(Object key, byte[] k, boolean forceOnHeap) throws IOException, ForceReattemptException { - SortedHoplogPersistedEvent ev; - try { - ev = (SortedHoplogPersistedEvent) owner.getHoplogOrganizer().read(k); - } catch (IOException e) { - owner.checkForPrimary(); - throw e; - } - if (ev != null) { - if (logger.isTraceEnabled() || DEBUG) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: got from hdfs ev:" + ev)); - } - return getEntryFromEvent(key, ev, forceOnHeap, false); - } - return null; - } - - /** - * set the versionTag on the newly faulted-in entry - */ - private void setVersionTag(RegionEntry re, VersionTag versionTag) { - if (owner.concurrencyChecksEnabled) { - versionTag.setMemberID( - owner.getVersionVector().getCanonicalId(versionTag.getMemberID())); - VersionStamp versionedRe = (VersionStamp) re; - versionedRe.setVersions(versionTag); - } - } - - private RegionEntry getFromHDFSQueue(Object key, byte[] k, boolean forceOnHeap) throws ForceReattemptException { - ConcurrentParallelGatewaySenderQueue q = getHDFSQueue(); - if (q == null) return null; - HDFSGatewayEventImpl hdfsGatewayEvent = (HDFSGatewayEventImpl) q.get(owner.getPartitionedRegion(), k, owner.getId()); - if (hdfsGatewayEvent != null) { - if (logger.isTraceEnabled() || DEBUG) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: got from hdfs queue: " + hdfsGatewayEvent)); - } - return getEntryFromEvent(key, hdfsGatewayEvent, forceOnHeap, false); - } - return null; - } - - private ConcurrentParallelGatewaySenderQueue getHDFSQueue() - throws ForceReattemptException { - if (this.hdfsQueue == null) { - String asyncQId = this.owner.getPartitionedRegion().getHDFSEventQueueName(); - final AsyncEventQueueImpl asyncQ = (AsyncEventQueueImpl)this.owner.getCache().getAsyncEventQueue(asyncQId); - final AbstractGatewaySender gatewaySender = (AbstractGatewaySender)asyncQ.getSender(); - AbstractGatewaySenderEventProcessor ep = gatewaySender.getEventProcessor(); - if (ep == null) return null; - hdfsQueue = (ConcurrentParallelGatewaySenderQueue)ep.getQueue(); - } - - // Check whether the queue has become primary here. - // There could be some time between bucket becoming a primary - // and underlying queue becoming a primary, so isPrimaryWithWait() - // waits for some time for the queue to become a primary on this member - final HDFSBucketRegionQueue brq = hdfsQueue.getBucketRegionQueue( - this.owner.getPartitionedRegion(), this.owner.getId()); - if (brq != null) { - if (owner.getBucketAdvisor().isPrimary() - && !brq.getBucketAdvisor().isPrimaryWithWait()) { - InternalDistributedMember primaryHolder = brq.getBucketAdvisor() - .basicGetPrimaryMember(); - throw new PrimaryBucketException("Bucket " + brq.getName() - + " is not primary. Current primary holder is " + primaryHolder); - } - } - - return hdfsQueue; - } - - public RegionEntry getEntryFromEvent(Object key, HDFSGatewayEventImpl event, boolean forceOnHeap, boolean forUpdate) { - Object val; - if (event.getOperation().isDestroy()) { - val = Token.TOMBSTONE; - } else if (event.getOperation().isInvalidate()) { - val = Token.INVALID; - } else { - val = event.getValue(); - } - RegionEntry re = null; - final TXStateInterface tx = owner.getTXState(); - if (tx == null) { - re = createRegionEntry(key, val, event.getVersionTag(), forceOnHeap); - return re; - } - else - if (val != null) { - if (((re = this.backingRM.getEntryInVM(key)) == null) - || (re.isRemoved() && !re.isTombstone())) { - boolean shouldCreateOnHeapEntry = !(owner.getOffHeap() && forUpdate); - re = createRegionEntry(key, val, event.getVersionTag(), shouldCreateOnHeapEntry); - if (forUpdate) { - if (re != null && tx != null) { - // put the region entry in backing CHM of AbstractRegionMap so that - // it can be locked in basicPut/destroy - RegionEntry oldRe = backingRM.putEntryIfAbsent(key, re); - if (oldRe != null) { - if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) { - ((OffHeapRegionEntry)re).release(); - } - return oldRe; - } - re.setMarkedForEviction(); - owner.updateSizeOnCreate(key, - owner.calculateRegionEntryValueSize(re)); - ((AbstractRegionMap)backingRM).incEntryCount(1); - ((AbstractRegionMap)backingRM).lruEntryCreate(re); - } - } - } - } - return re; - } - - public RegionEntry getEntryFromEvent(Object key, SortedHoplogPersistedEvent event, boolean forceOnHeap, boolean forUpdate) { - Object val = getValueFromEvent(event); - RegionEntry re = null; - final TXStateInterface tx = owner.getTXState(); - if (tx == null) { - re = createRegionEntry(key, val, event.getVersionTag(), forceOnHeap); - return re; - } - else // FOR TX case, we need to create off heap entry if required - if (val != null) { - if (((re = this.backingRM.getEntryInVM(key)) == null) - || (re.isRemoved() && !re.isTombstone())) { - boolean shouldCreateOnHeapEntry = !(owner.getOffHeap() && forUpdate); - re = createRegionEntry(key, val, event.getVersionTag(), shouldCreateOnHeapEntry); - if(forUpdate) { - if (re != null && tx != null) { - // put the region entry in backing CHM of AbstractRegionMap so that - // it can be locked in basicPut/destroy - RegionEntry oldRe = backingRM.putEntryIfAbsent(key, re); - if (oldRe != null) { - if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) { - ((OffHeapRegionEntry)re).release(); - } - return oldRe; - } - re.setMarkedForEviction(); - owner.updateSizeOnCreate(key, - owner.calculateRegionEntryValueSize(re)); - ((AbstractRegionMap)backingRM).incEntryCount(1); - ((AbstractRegionMap)backingRM).lruEntryCreate(re); - } - } - } - } - return re; - } - - private RegionEntry createRegionEntry(Object key, Object value, VersionTag tag, boolean forceOnHeap) { - RegionEntryFactory ref = backingRM.getEntryFactory(); - if (forceOnHeap) { - ref = ref.makeOnHeap(); - } - value = getValueDuringGII(key, value); - RegionEntry re = ref.createEntry(this.owner, key, value); - setVersionTag(re, tag); - if (re instanceof LRUEntry) { - assert backingRM instanceof AbstractLRURegionMap; - EnableLRU ccHelper = ((AbstractLRURegionMap)backingRM)._getCCHelper(); - ((LRUEntry)re).updateEntrySize(ccHelper); - } - return re; - } - - private Object getValueDuringGII(Object key, Object value) { - if (owner.getIndexUpdater() != null && !owner.isInitialized()) { - return AbstractRegionMap.listOfDeltasCreator.newValue(key, owner, value, - null); - } - return value; - } - - private Set createEntriesSet(IteratorType type) - throws ForceReattemptException { - ConcurrentParallelGatewaySenderQueue q = getHDFSQueue(); - if (q == null) return Collections.emptySet(); - HDFSBucketRegionQueue brq = q.getBucketRegionQueue(this.owner.getPartitionedRegion(), owner.getId()); - return new HDFSEntriesSet(owner, brq, owner.getHoplogOrganizer(), type, refs); - } - - private void closeDeadIterators() { - Reference weak; - while ((weak = refs.poll()) != null) { - if (logger.isTraceEnabled() || DEBUG) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Closing weak ref for iterator " - + weak.get())); - } - weak.get().close(); - } - } - - /** - * gets the value from event, deserializing if necessary. - */ - private Object getValueFromEvent(PersistedEventImpl ev) { - if (ev.getOperation().isDestroy()) { - return Token.TOMBSTONE; - } else if (ev.getOperation().isInvalidate()) { - return Token.INVALID; - } - return ev.getValue(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java deleted file mode 100644 index 9336ed7..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache; - -import java.util.Collection; - -import com.gemstone.gemfire.cache.CacheWriterException; -import com.gemstone.gemfire.cache.TimeoutException; -import com.gemstone.gemfire.internal.size.SingleObjectSizer; - -/** - * Implementation of RegionMap that reads data from HDFS. - * - */ -public class HDFSRegionMapImpl extends AbstractRegionMap implements HDFSRegionMap { - - private final HDFSRegionMapDelegate delegate; - - private static final boolean DEBUG = Boolean.getBoolean("hdfsRegionMap.DEBUG"); - - public HDFSRegionMapImpl(LocalRegion owner, Attributes attrs, - InternalRegionArguments internalRegionArgs) { - super(internalRegionArgs); - assert owner instanceof BucketRegion; - initialize(owner, attrs, internalRegionArgs, false); - this.delegate = new HDFSRegionMapDelegate(owner, attrs, internalRegionArgs, this); - } - - @Override - public RegionEntry getEntry(Object key) { - return delegate.getEntry(key, null); - } - - @Override - protected RegionEntry getEntry(EntryEventImpl event) { - return delegate.getEntry(event); - } - - @Override - @SuppressWarnings("unchecked") - public Collection regionEntries() { - return delegate.regionEntries(); - } - - @Override - public int size() { - return delegate.size(); - } - - @Override - public boolean isEmpty() { - return delegate.isEmpty(); - } - - @Override - public HDFSRegionMapDelegate getDelegate() { - return this.delegate; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java index 36eee80..bda5a27 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java @@ -20,8 +20,6 @@ package com.gemstone.gemfire.internal.cache; import java.util.Collection; import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector; import com.gemstone.gemfire.cache.query.internal.cq.CqService; import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.internal.cache.extension.Extensible; @@ -45,7 +43,5 @@ public interface InternalCache extends Cache, Extensible { public CqService getCqService(); - public Collection getHDFSStores() ; - public T getService(Class clazz); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java index e506f2e..0885477 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java @@ -39,17 +39,22 @@ public interface InternalDataView { * @param keyInfo * @param localRegion * @param updateStats - * @param disableCopyOnRead - * @param preferCD + * @param disableCopyOnRead + * @param preferCD * @param clientEvent TODO * @param returnTombstones TODO * @param retainResult if true then the result may be a retained off-heap reference * @return the object associated with the key */ @Retained - Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, - boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, - boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult); + Object getDeserializedValue(KeyInfo keyInfo, + LocalRegion localRegion, + boolean updateStats, + boolean disableCopyOnRead, + boolean preferCD, + EntryEventImpl clientEvent, + boolean returnTombstones, + boolean retainResult); /** * @param event @@ -182,8 +187,8 @@ public interface InternalDataView { * @return the Object associated with the key */ Object findObject(KeyInfo key, LocalRegion r, boolean isCreate, boolean generateCallbacks, - Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, - EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS); + Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, boolean returnTombstones); /** @@ -224,13 +229,18 @@ public interface InternalDataView { * * @param localRegion * @param key - * @param doNotLockEntry + * @param doNotLockEntry * @param requestingClient the client that made the request, or null if not from a client * @param clientEvent the client event, if any * @param returnTombstones TODO * @return the serialized value from the cache */ - Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException; + Object getSerializedValue(LocalRegion localRegion, + KeyInfo key, + boolean doNotLockEntry, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) throws DataLocationException; abstract void checkSupportsRegionDestroy() throws UnsupportedOperationInTransactionException; abstract void checkSupportsRegionInvalidate() throws UnsupportedOperationInTransactionException; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java index 41e763d..f7d46fe 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java @@ -37,7 +37,6 @@ public final class InternalRegionArguments private boolean isUsedForPartitionedRegionAdmin; private boolean isUsedForSerialGatewaySenderQueue; private boolean isUsedForParallelGatewaySenderQueue; - private boolean isUsedForHDFSParallelGatewaySenderQueue = false; private int bucketRedundancy; private boolean isUsedForPartitionedRegionBucket; private RegionAdvisor partitionedRegionAdvisor; @@ -273,26 +272,11 @@ public final class InternalRegionArguments this.isUsedForParallelGatewaySenderQueue = queueFlag; return this; } - public InternalRegionArguments setIsUsedForHDFSParallelGatewaySenderQueue( - boolean queueFlag) { - this.isUsedForHDFSParallelGatewaySenderQueue = queueFlag; - return this; - } public boolean isUsedForParallelGatewaySenderQueue() { return this.isUsedForParallelGatewaySenderQueue; } - public boolean isUsedForHDFSParallelGatewaySenderQueue() { - return this.isUsedForHDFSParallelGatewaySenderQueue; - } - - public boolean isReadWriteHDFSRegion() { - return isUsedForPartitionedRegionBucket() - && getPartitionedRegion().getHDFSStoreName() != null - && !getPartitionedRegion().getHDFSWriteOnly(); - } - public InternalRegionArguments setParallelGatewaySender( AbstractGatewaySender pgSender) { this.parallelGatewaySender = pgSender; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java index b3de9b7..3ad294c 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java @@ -116,11 +116,6 @@ import com.gemstone.gemfire.cache.client.internal.ServerRegionProxy; import com.gemstone.gemfire.cache.control.ResourceManager; import com.gemstone.gemfire.cache.execute.Function; import com.gemstone.gemfire.cache.execute.ResultCollector; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSIntegrationUtil; -import com.gemstone.gemfire.cache.hdfs.internal.HoplogListenerForRegion; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager; import com.gemstone.gemfire.cache.partition.PartitionRegionHelper; import com.gemstone.gemfire.cache.query.FunctionDomainException; import com.gemstone.gemfire.cache.query.Index; @@ -465,10 +460,6 @@ public class LocalRegion extends AbstractRegion // Lock for updating PR MetaData on client side public final Lock clientMetaDataLock = new ReentrantLock(); - - protected HdfsRegionManager hdfsManager; - protected HoplogListenerForRegion hoplogListener; - /** * There seem to be cases where a region can be created and yet the * distributed system is not yet in place... @@ -641,7 +632,6 @@ public class LocalRegion extends AbstractRegion } } - this.hdfsManager = initHDFSManager(); this.dsi = findDiskStore(attrs, internalRegionArgs); this.diskRegion = createDiskRegion(internalRegionArgs); this.entries = createRegionMap(internalRegionArgs); @@ -696,22 +686,8 @@ public class LocalRegion extends AbstractRegion } - private HdfsRegionManager initHDFSManager() { - HdfsRegionManager hdfsMgr = null; - if (this.getHDFSStoreName() != null) { - this.hoplogListener = new HoplogListenerForRegion(); - HDFSRegionDirector.getInstance().setCache(cache); - hdfsMgr = HDFSRegionDirector.getInstance().manageRegion(this, - this.getHDFSStoreName(), hoplogListener); - } - return hdfsMgr; - } - private RegionMap createRegionMap(InternalRegionArguments internalRegionArgs) { RegionMap result = null; - if ((internalRegionArgs.isReadWriteHDFSRegion()) && this.diskRegion != null) { - this.diskRegion.setEntriesMapIncompatible(true); - } if (this.diskRegion != null) { result = this.diskRegion.useExistingRegionMap(this); } @@ -977,11 +953,6 @@ public class LocalRegion extends AbstractRegion existing = (LocalRegion)this.subregions.get(subregionName); if (existing == null) { - // create the async queue for HDFS if required. - HDFSIntegrationUtil.createAndAddAsyncQueue(regionPath, - regionAttributes, this.cache); - regionAttributes = cache.setEvictionAttributesForLargeRegion( - regionAttributes); if (regionAttributes.getScope().isDistributed() && internalRegionArgs.isUsedForPartitionedRegionBucket()) { final PartitionedRegion pr = internalRegionArgs @@ -991,15 +962,8 @@ public class LocalRegion extends AbstractRegion internalRegionArgs.setKeyRequiresRegionContext(pr .keyRequiresRegionContext()); if (pr.isShadowPR()) { - if (!pr.isShadowPRForHDFS()) { - newRegion = new BucketRegionQueue(subregionName, regionAttributes, - this, this.cache, internalRegionArgs); - } - else { - newRegion = new HDFSBucketRegionQueue(subregionName, regionAttributes, - this, this.cache, internalRegionArgs); - } - + newRegion = new BucketRegionQueue(subregionName, regionAttributes, + this, this.cache, internalRegionArgs); } else { newRegion = new BucketRegion(subregionName, regionAttributes, this, this.cache, internalRegionArgs); @@ -1134,7 +1098,6 @@ public class LocalRegion extends AbstractRegion if (event.getEventId() == null && generateEventID()) { event.setNewEventId(cache.getDistributedSystem()); } - assert event.isFetchFromHDFS() : "validatedPut() should have been called"; // Fix for 42448 - Only make create with null a local invalidate for // normal regions. Otherwise, it will become a distributed invalidate. if (getDataPolicy() == DataPolicy.NORMAL) { @@ -1261,18 +1224,20 @@ public class LocalRegion extends AbstractRegion * @param retainResult if true then the result may be a retained off-heap reference * @return the value for the given key */ - public final Object getDeserializedValue(RegionEntry re, final KeyInfo keyInfo, final boolean updateStats, boolean disableCopyOnRead, - boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) { + public final Object getDeserializedValue(RegionEntry re, + final KeyInfo keyInfo, + final boolean updateStats, + boolean disableCopyOnRead, + boolean preferCD, + EntryEventImpl clientEvent, + boolean returnTombstones, + boolean retainResult) { if (this.diskRegion != null) { this.diskRegion.setClearCountReference(); } try { if (re == null) { - if (allowReadFromHDFS) { - re = this.entries.getEntry(keyInfo.getKey()); - } else { - re = this.entries.getOperationalEntryInVM(keyInfo.getKey()); - } + re = this.entries.getEntry(keyInfo.getKey()); } //skip updating the stats if the value is null // TODO - We need to clean up the callers of the this class so that we can @@ -1382,7 +1347,7 @@ public class LocalRegion extends AbstractRegion public Object get(Object key, Object aCallbackArgument, boolean generateCallbacks, EntryEventImpl clientEvent) throws TimeoutException, CacheLoaderException { - Object result = get(key, aCallbackArgument, generateCallbacks, false, false, null, clientEvent, false, true/*allowReadFromHDFS*/); + Object result = get(key, aCallbackArgument, generateCallbacks, false, false, null, clientEvent, false); if (Token.isInvalid(result)) { result = null; } @@ -1392,11 +1357,16 @@ public class LocalRegion extends AbstractRegion /* * @see BucketRegion#getSerialized(KeyInfo, boolean, boolean) */ - public Object get(Object key, Object aCallbackArgument, - boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD, - ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws TimeoutException, CacheLoaderException { + public Object get(Object key, + Object aCallbackArgument, + boolean generateCallbacks, + boolean disableCopyOnRead, + boolean preferCD, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) throws TimeoutException, CacheLoaderException { return get(key, aCallbackArgument, - generateCallbacks, disableCopyOnRead, preferCD,requestingClient, clientEvent, returnTombstones, false, allowReadFromHDFS, false); + generateCallbacks, disableCopyOnRead, preferCD,requestingClient, clientEvent, returnTombstones, false, false); } /** @@ -1418,16 +1388,17 @@ public class LocalRegion extends AbstractRegion public Object getRetained(Object key, Object aCallbackArgument, boolean generateCallbacks, boolean disableCopyOnRead, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean opScopeIsLocal) throws TimeoutException, CacheLoaderException { - return get(key, aCallbackArgument, generateCallbacks, disableCopyOnRead, true, requestingClient, clientEvent, returnTombstones, opScopeIsLocal, true, false/* see GEODE-1291*/); + return get(key, aCallbackArgument, generateCallbacks, disableCopyOnRead, true, requestingClient, clientEvent, returnTombstones, opScopeIsLocal, + false /* see GEODE-1291*/); } /** * @param opScopeIsLocal if true then just check local storage for a value; if false then try to find the value if it is not local * @param retainResult if true then the result may be a retained off-heap reference. */ public Object get(Object key, Object aCallbackArgument, - boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD, - ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, - boolean opScopeIsLocal, boolean allowReadFromHDFS, boolean retainResult) throws TimeoutException, CacheLoaderException + boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD, + ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, + boolean opScopeIsLocal, boolean retainResult) throws TimeoutException, CacheLoaderException { assert !retainResult || preferCD; validateKey(key); @@ -1440,7 +1411,8 @@ public class LocalRegion extends AbstractRegion boolean isMiss = true; try { KeyInfo keyInfo = getKeyInfo(key, aCallbackArgument); - Object value = getDataView().getDeserializedValue(keyInfo, this, true, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadFromHDFS, retainResult); + Object value = getDataView().getDeserializedValue(keyInfo, this, true, disableCopyOnRead, preferCD, clientEvent, returnTombstones, + retainResult); final boolean isCreate = value == null; isMiss = value == null || Token.isInvalid(value) || (!returnTombstones && value == Token.TOMBSTONE); @@ -1453,13 +1425,13 @@ public class LocalRegion extends AbstractRegion // if scope is local and there is no loader, then // don't go further to try and get value if (!opScopeIsLocal - && ((getScope().isDistributed() && !isHDFSRegion()) + && ((getScope().isDistributed()) || hasServerProxy() || basicGetLoader() != null)) { // serialize search/load threads if not in txn value = getDataView().findObject(keyInfo, this, isCreate, generateCallbacks, value, disableCopyOnRead, - preferCD, requestingClient, clientEvent, returnTombstones, false/*allowReadFromHDFS*/); + preferCD, requestingClient, clientEvent, returnTombstones); if (!returnTombstones && value == Token.TOMBSTONE) { value = null; } @@ -1485,7 +1457,7 @@ public class LocalRegion extends AbstractRegion */ final public void recordMiss(final RegionEntry re, Object key) { final RegionEntry e; - if (re == null && !isTX() && !isHDFSRegion()) { + if (re == null && !isTX()) { e = basicGetEntry(key); } else { e = re; @@ -1494,60 +1466,30 @@ public class LocalRegion extends AbstractRegion } /** - * @return true if this region has been configured for HDFS persistence - */ - public boolean isHDFSRegion() { - return false; - } - - /** - * @return true if this region is configured to read and write data from HDFS - */ - public boolean isHDFSReadWriteRegion() { - return false; - } - - /** - * @return true if this region is configured to only write to HDFS - */ - protected boolean isHDFSWriteOnly() { - return false; - } - - /** - * FOR TESTING ONLY - */ - public HoplogListenerForRegion getHoplogListener() { - return hoplogListener; - } - - /** - * FOR TESTING ONLY - */ - public HdfsRegionManager getHdfsRegionManager() { - return hdfsManager; - } - - /** * optimized to only allow one thread to do a search/load, other threads wait * on a future - * - * @param keyInfo + * @param keyInfo * @param p_isCreate * true if call found no entry; false if updating an existing * entry * @param generateCallbacks * @param p_localValue - * the value retrieved from the region for this object. +* the value retrieved from the region for this object. * @param disableCopyOnRead if true then do not make a copy * @param preferCD true if the preferred result form is CachedDeserializable * @param clientEvent the client event, if any * @param returnTombstones whether to return tombstones */ @Retained - Object nonTxnFindObject(KeyInfo keyInfo, boolean p_isCreate, - boolean generateCallbacks, Object p_localValue, boolean disableCopyOnRead, boolean preferCD, - ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) + Object nonTxnFindObject(KeyInfo keyInfo, + boolean p_isCreate, + boolean generateCallbacks, + Object p_localValue, + boolean disableCopyOnRead, + boolean preferCD, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) throws TimeoutException, CacheLoaderException { final Object key = keyInfo.getKey(); @@ -1606,7 +1548,8 @@ public class LocalRegion extends AbstractRegion try { boolean partitioned = this.getDataPolicy().withPartitioning(); if (!partitioned) { - localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD, clientEvent, false, false/*allowReadFromHDFS*/, false); + localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD, clientEvent, false, + false); // stats have now been updated if (localValue != null && !Token.isInvalid(localValue)) { @@ -1615,7 +1558,7 @@ public class LocalRegion extends AbstractRegion } isCreate = localValue == null; result = findObjectInSystem(keyInfo, isCreate, null, generateCallbacks, - localValue, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, false/*allowReadFromHDFS*/); + localValue, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones); } else { @@ -1623,7 +1566,7 @@ public class LocalRegion extends AbstractRegion // For PRs we don't want to deserialize the value and we can't use findObjectInSystem because // it can invoke code that is transactional. result = getSharedDataView().findObject(keyInfo, this, true/*isCreate*/, generateCallbacks, - localValue, disableCopyOnRead, preferCD, null, null, false, allowReadFromHDFS); + localValue, disableCopyOnRead, preferCD, null, null, false); // TODO why are we not passing the client event or returnTombstones in the above invokation? } @@ -1806,7 +1749,6 @@ public class LocalRegion extends AbstractRegion public final EntryEventImpl newPutEntryEvent(Object key, Object value, Object aCallbackArgument) { EntryEventImpl ev = newUpdateEntryEvent(key, value, aCallbackArgument); - ev.setFetchFromHDFS(false); ev.setPutDML(true); return ev; } @@ -1938,23 +1880,11 @@ public class LocalRegion extends AbstractRegion } } - protected boolean includeHDFSResults() { - return isUsedForPartitionedRegionBucket() - && isHDFSReadWriteRegion() - && getPartitionedRegion().includeHDFSResults(); - } - - /** a fast estimate of total number of entries locally in the region */ public long getEstimatedLocalSize() { RegionMap rm; if (!this.isDestroyed) { long size; - if (isHDFSReadWriteRegion() && this.initialized) { - // this size is not used by HDFS region iterators - // fixes bug 49239 - return 0; - } // if region has not been initialized yet, then get the estimate from // disk region's recovery map if available if (!this.initialized && this.diskRegion != null @@ -2266,9 +2196,6 @@ public class LocalRegion extends AbstractRegion if (this.imageState.isClient() && !this.concurrencyChecksEnabled) { return result - this.imageState.getDestroyedEntriesCount(); } - if (includeHDFSResults()) { - return result; - } return result - this.tombstoneCount.get(); } } @@ -3004,11 +2931,18 @@ public class LocalRegion extends AbstractRegion * @param clientEvent the client's event, if any. If not null, we set the version tag * @param returnTombstones TODO * @return the deserialized value - * @see DistributedRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean, boolean ) - */ - protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate, - TXStateInterface tx, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, - EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) + * @see LocalRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean) + */ + protected Object findObjectInSystem(KeyInfo keyInfo, + boolean isCreate, + TXStateInterface tx, + boolean generateCallbacks, + Object localValue, + boolean disableCopyOnRead, + boolean preferCD, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) throws CacheLoaderException, TimeoutException { final Object key = keyInfo.getKey(); @@ -5383,9 +5317,6 @@ public class LocalRegion extends AbstractRegion // Notify bridge clients (if this is a BridgeServer) event.setEventType(eventType); notifyBridgeClients(event); - if (this.hdfsStoreName != null) { - notifyGatewaySender(eventType, event); - } if(callDispatchListenerEvent){ dispatchListenerEvent(eventType, event); } @@ -7271,24 +7202,8 @@ public class LocalRegion extends AbstractRegion if (generateEventID()) { event.setNewEventId(cache.getDistributedSystem()); } - event.setFetchFromHDFS(false); - return event; - } - - @Retained - protected EntryEventImpl generateCustomEvictDestroyEvent(final Object key) { - @Retained EntryEventImpl event = EntryEventImpl.create( - this, Operation.CUSTOM_EVICT_DESTROY, key, null/* newValue */, - null, false, getMyId()); - - // Fix for bug#36963 - if (generateEventID()) { - event.setNewEventId(cache.getDistributedSystem()); - } - event.setFetchFromHDFS(false); return event; } - /** * @return true if the evict destroy was done; false if it was not needed */ @@ -9941,8 +9856,6 @@ public class LocalRegion extends AbstractRegion } } - clearHDFSData(); - if (!isProxy()) { // Now we need to recreate all the indexes. //If the indexManager is null we don't have to worry @@ -9981,11 +9894,6 @@ public class LocalRegion extends AbstractRegion } } - /**Clear HDFS data, if present */ - protected void clearHDFSData() { - //do nothing, clear is implemented for subclasses like BucketRegion. - } - @Override void basicLocalClear(RegionEventImpl rEvent) { @@ -10762,7 +10670,6 @@ public class LocalRegion extends AbstractRegion } public final DistributedPutAllOperation newPutAllForPUTDmlOperation(Map map, Object callbackArg) { DistributedPutAllOperation dpao = newPutAllOperation(map, callbackArg); - dpao.getEvent().setFetchFromHDFS(false); dpao.getEvent().setPutDML(true); return dpao; } @@ -10818,7 +10725,6 @@ public class LocalRegion extends AbstractRegion putallOp, this, Operation.PUTALL_CREATE, key, value); try { - event.setFetchFromHDFS(putallOp.getEvent().isFetchFromHDFS()); event.setPutDML(putallOp.getEvent().isPutDML()); if (tagHolder != null) { @@ -12921,22 +12827,6 @@ public class LocalRegion extends AbstractRegion public Integer getCountNotFoundInLocal() { return countNotFoundInLocal.get(); } - /// End of Variables and methods for test Hook for HDFS /////// - public void forceHDFSCompaction(boolean isMajor, Integer maxWaitTime) { - throw new UnsupportedOperationException( - LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE - .toLocalizedString(getName())); - } - - public void flushHDFSQueue(int maxWaitTime) { - throw new UnsupportedOperationException( - LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE - .toLocalizedString(getName())); - } - - public long lastMajorHDFSCompaction() { - throw new UnsupportedOperationException(); - } public static void simulateClearForTests(boolean flag) { simulateClearForTests = flag; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java index 5193a17..c26ff10 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java @@ -17,7 +17,6 @@ package com.gemstone.gemfire.internal.cache; import java.util.Collection; -import java.util.Iterator; import java.util.Set; import com.gemstone.gemfire.cache.EntryNotFoundException; @@ -36,9 +35,16 @@ public class LocalRegionDataView implements InternalDataView { /* (non-Javadoc) * @see com.gemstone.gemfire.internal.cache.InternalDataView#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean) */ - public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, - boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadsFromHDFS, boolean retainResult) { - return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadsFromHDFS, retainResult); + public Object getDeserializedValue(KeyInfo keyInfo, + LocalRegion localRegion, + boolean updateStats, + boolean disableCopyOnRead, + boolean preferCD, + EntryEventImpl clientEvent, + boolean returnTombstones, + boolean retainResult) { + return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones, + retainResult); } /* (non-Javadoc) @@ -136,9 +142,17 @@ public class LocalRegionDataView implements InternalDataView { /* (non-Javadoc) * @see com.gemstone.gemfire.internal.cache.InternalDataView#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object) */ - public Object findObject(KeyInfo keyInfo, LocalRegion r, boolean isCreate, - boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) { - return r.nonTxnFindObject(keyInfo, isCreate, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS); + public Object findObject(KeyInfo keyInfo, + LocalRegion r, + boolean isCreate, + boolean generateCallbacks, + Object value, + boolean disableCopyOnRead, + boolean preferCD, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) { + return r.nonTxnFindObject(keyInfo, isCreate, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones); } /* (non-Javadoc) @@ -180,7 +194,12 @@ public class LocalRegionDataView implements InternalDataView { * (non-Javadoc) * @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.BucketRegion, java.lang.Object, java.lang.Object) */ - public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException { + public Object getSerializedValue(LocalRegion localRegion, + KeyInfo key, + boolean doNotLockEntry, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent, + boolean returnTombstones) throws DataLocationException { throw new IllegalStateException(); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java index bb83383..4c1fa7f 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java @@ -461,26 +461,6 @@ public class NonLocalRegionEntry implements RegionEntry, VersionStamp { } @Override - public boolean isMarkedForEviction() { - throw new UnsupportedOperationException(LocalizedStrings - .PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY - .toLocalizedString()); - } - @Override - public void setMarkedForEviction() { - throw new UnsupportedOperationException(LocalizedStrings - .PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY - .toLocalizedString()); - } - - @Override - public void clearMarkedForEviction() { - throw new UnsupportedOperationException(LocalizedStrings - .PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY - .toLocalizedString()); - } - - @Override public boolean isValueNull() { return (null == getValueAsToken()); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java index fe8813e..4728594 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java @@ -7384,19 +7384,6 @@ public final class Oplog implements CompactableOplog, Flushable { // TODO Auto-generated method stub } @Override - public boolean isMarkedForEviction() { - // TODO Auto-generated method stub - return false; - } - @Override - public void setMarkedForEviction() { - // TODO Auto-generated method stub - } - @Override - public void clearMarkedForEviction() { - // TODO Auto-generated method stub - } - @Override public boolean isInvalid() { // TODO Auto-generated method stub return false;