Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C2BE018FB8 for ; Mon, 22 Feb 2016 18:36:00 +0000 (UTC) Received: (qmail 74703 invoked by uid 500); 22 Feb 2016 18:36:00 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 74671 invoked by uid 500); 22 Feb 2016 18:36:00 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 74658 invoked by uid 99); 22 Feb 2016 18:36:00 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2016 18:36:00 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 0301AC3A79 for ; Mon, 22 Feb 2016 18:36:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.021 X-Spam-Level: X-Spam-Status: No, score=-4.021 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id ysQRHRUgLqTv for ; Mon, 22 Feb 2016 18:35:43 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 638245FB6A for ; Mon, 22 Feb 2016 18:35:26 +0000 (UTC) Received: (qmail 67935 invoked by uid 99); 22 Feb 2016 18:35:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2016 18:35:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F1D85E0492; Mon, 22 Feb 2016 18:35:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: upthewaterspout@apache.org To: commits@geode.incubator.apache.org Date: Mon, 22 Feb 2016 18:36:33 -0000 Message-Id: <532195f3c9984e2982ff576435c31cea@git.apache.org> In-Reply-To: <0806417cf36345658593943fc13e8a5d@git.apache.org> References: <0806417cf36345658593943fc13e8a5d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [71/83] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java index 0000000,b145a91..541c453 mode 000000,100755..100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java @@@ -1,0 -1,11398 +1,11398 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package com.gemstone.gemfire.internal.cache; + + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.io.Serializable; + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.Comparator; + import java.util.HashMap; + import java.util.HashSet; + import java.util.Hashtable; + import java.util.Iterator; + import java.util.LinkedList; + import java.util.List; + import java.util.Map; + import java.util.NoSuchElementException; + import java.util.Random; + import java.util.Set; + import java.util.concurrent.Callable; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.CopyOnWriteArrayList; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.Executors; + import java.util.concurrent.Future; + import java.util.concurrent.FutureTask; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.ThreadFactory; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.concurrent.locks.Lock; + + import org.apache.logging.log4j.Logger; + + import com.gemstone.gemfire.CancelException; + import com.gemstone.gemfire.InternalGemFireException; + import com.gemstone.gemfire.StatisticsFactory; + import com.gemstone.gemfire.SystemFailure; + import com.gemstone.gemfire.cache.AttributesFactory; + import com.gemstone.gemfire.cache.AttributesMutator; + import com.gemstone.gemfire.cache.Cache; + import com.gemstone.gemfire.cache.CacheClosedException; + import com.gemstone.gemfire.cache.CacheException; + import com.gemstone.gemfire.cache.CacheListener; + import com.gemstone.gemfire.cache.CacheLoader; + import com.gemstone.gemfire.cache.CacheLoaderException; + import com.gemstone.gemfire.cache.CacheStatistics; + import com.gemstone.gemfire.cache.CacheWriter; + import com.gemstone.gemfire.cache.CacheWriterException; + import com.gemstone.gemfire.cache.CustomExpiry; + import com.gemstone.gemfire.cache.DataPolicy; + import com.gemstone.gemfire.cache.DiskAccessException; + import com.gemstone.gemfire.cache.EntryExistsException; + import com.gemstone.gemfire.cache.EntryNotFoundException; + import com.gemstone.gemfire.cache.ExpirationAttributes; + import com.gemstone.gemfire.cache.InterestPolicy; + import com.gemstone.gemfire.cache.InterestRegistrationEvent; + import com.gemstone.gemfire.cache.LoaderHelper; + import com.gemstone.gemfire.cache.LowMemoryException; + import com.gemstone.gemfire.cache.Operation; + import com.gemstone.gemfire.cache.PartitionAttributes; + import com.gemstone.gemfire.cache.PartitionResolver; + import com.gemstone.gemfire.cache.PartitionedRegionDistributionException; + import com.gemstone.gemfire.cache.PartitionedRegionStorageException; + import com.gemstone.gemfire.cache.Region; + import com.gemstone.gemfire.cache.RegionAttributes; + import com.gemstone.gemfire.cache.RegionDestroyedException; + import com.gemstone.gemfire.cache.RegionEvent; + import com.gemstone.gemfire.cache.RegionExistsException; + import com.gemstone.gemfire.cache.RegionMembershipListener; + import com.gemstone.gemfire.cache.TimeoutException; + import com.gemstone.gemfire.cache.TransactionDataNotColocatedException; + import com.gemstone.gemfire.cache.TransactionDataRebalancedException; + import com.gemstone.gemfire.cache.TransactionException; + import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; + import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats; + import com.gemstone.gemfire.cache.execute.EmtpyRegionFunctionException; + import com.gemstone.gemfire.cache.execute.Function; + import com.gemstone.gemfire.cache.execute.FunctionContext; + import com.gemstone.gemfire.cache.execute.FunctionException; + import com.gemstone.gemfire.cache.execute.FunctionService; + import com.gemstone.gemfire.cache.execute.ResultCollector; + import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl; + import com.gemstone.gemfire.cache.hdfs.internal.hoplog.CompactionStatus; + import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSFlushQueueFunction; + import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionArgs; + import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionFunction; + import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionResultCollector; + import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSLastCompactionTimeFunction; + import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector; + import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer; + import com.gemstone.gemfire.cache.partition.PartitionListener; + import com.gemstone.gemfire.cache.partition.PartitionNotAvailableException; + import com.gemstone.gemfire.cache.query.FunctionDomainException; + import com.gemstone.gemfire.cache.query.Index; + import com.gemstone.gemfire.cache.query.IndexCreationException; + import com.gemstone.gemfire.cache.query.IndexExistsException; + import com.gemstone.gemfire.cache.query.IndexInvalidException; + import com.gemstone.gemfire.cache.query.IndexNameConflictException; + import com.gemstone.gemfire.cache.query.IndexType; + import com.gemstone.gemfire.cache.query.MultiIndexCreationException; + import com.gemstone.gemfire.cache.query.NameResolutionException; + import com.gemstone.gemfire.cache.query.QueryException; + import com.gemstone.gemfire.cache.query.QueryInvocationTargetException; + import com.gemstone.gemfire.cache.query.SelectResults; + import com.gemstone.gemfire.cache.query.TypeMismatchException; + import com.gemstone.gemfire.cache.query.internal.CompiledSelect; + import com.gemstone.gemfire.cache.query.internal.DefaultQuery; + import com.gemstone.gemfire.cache.query.internal.ExecutionContext; + import com.gemstone.gemfire.cache.query.internal.QCompiler; + import com.gemstone.gemfire.cache.query.internal.QueryExecutor; + import com.gemstone.gemfire.cache.query.internal.ResultsBag; + import com.gemstone.gemfire.cache.query.internal.ResultsCollectionWrapper; + import com.gemstone.gemfire.cache.query.internal.ResultsSet; + import com.gemstone.gemfire.cache.query.internal.index.AbstractIndex; + import com.gemstone.gemfire.cache.query.internal.index.IndexCreationData; + import com.gemstone.gemfire.cache.query.internal.index.IndexManager; + import com.gemstone.gemfire.cache.query.internal.index.IndexUtils; + import com.gemstone.gemfire.cache.query.internal.index.PartitionedIndex; + import com.gemstone.gemfire.cache.query.internal.types.ObjectTypeImpl; + import com.gemstone.gemfire.cache.query.types.ObjectType; + import com.gemstone.gemfire.cache.wan.GatewaySender; + import com.gemstone.gemfire.distributed.DistributedLockService; + import com.gemstone.gemfire.distributed.DistributedMember; + import com.gemstone.gemfire.distributed.LockServiceDestroyedException; + import com.gemstone.gemfire.distributed.internal.DM; + import com.gemstone.gemfire.distributed.internal.DistributionAdvisee; + import com.gemstone.gemfire.distributed.internal.DistributionAdvisor; + import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile; + import com.gemstone.gemfire.distributed.internal.DistributionManager; + import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; + import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem.DisconnectListener; + import com.gemstone.gemfire.distributed.internal.MembershipListener; + import com.gemstone.gemfire.distributed.internal.ProfileListener; + import com.gemstone.gemfire.distributed.internal.ReplyException; + import com.gemstone.gemfire.distributed.internal.ReplyProcessor21; + import com.gemstone.gemfire.distributed.internal.locks.DLockRemoteToken; + import com.gemstone.gemfire.distributed.internal.locks.DLockService; + import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; + import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes; + import com.gemstone.gemfire.internal.Assert; + import com.gemstone.gemfire.internal.NanoTimer; + import com.gemstone.gemfire.internal.SetUtils; + import com.gemstone.gemfire.internal.Version; + import com.gemstone.gemfire.internal.cache.BucketAdvisor.ServerBucketProfile; + import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile; + import com.gemstone.gemfire.internal.cache.DestroyPartitionedRegionMessage.DestroyPartitionedRegionResponse; + import com.gemstone.gemfire.internal.cache.PutAllPartialResultException.PutAllPartialResult; + import com.gemstone.gemfire.internal.cache.control.HeapMemoryMonitor; + import com.gemstone.gemfire.internal.cache.control.InternalResourceManager; + import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType; + import com.gemstone.gemfire.internal.cache.control.MemoryEvent; + import com.gemstone.gemfire.internal.cache.control.MemoryThresholds; + import com.gemstone.gemfire.internal.cache.execute.AbstractExecution; + import com.gemstone.gemfire.internal.cache.execute.FunctionExecutionNodePruner; + import com.gemstone.gemfire.internal.cache.execute.FunctionRemoteContext; + import com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException; + import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector; + import com.gemstone.gemfire.internal.cache.execute.PartitionedRegionFunctionExecutor; + import com.gemstone.gemfire.internal.cache.execute.PartitionedRegionFunctionResultSender; + import com.gemstone.gemfire.internal.cache.execute.PartitionedRegionFunctionResultWaiter; + import com.gemstone.gemfire.internal.cache.execute.RegionFunctionContextImpl; + import com.gemstone.gemfire.internal.cache.execute.ServerToClientFunctionResultSender; + import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier; + import com.gemstone.gemfire.internal.cache.lru.HeapEvictor; + import com.gemstone.gemfire.internal.cache.lru.LRUStatistics; + import com.gemstone.gemfire.internal.cache.partitioned.ContainsKeyValueMessage; + import com.gemstone.gemfire.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse; + import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage; + import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage.DestroyResponse; + import com.gemstone.gemfire.internal.cache.partitioned.DestroyRegionOnDataStoreMessage; + import com.gemstone.gemfire.internal.cache.partitioned.DumpAllPRConfigMessage; + import com.gemstone.gemfire.internal.cache.partitioned.DumpB2NRegion; + import com.gemstone.gemfire.internal.cache.partitioned.DumpB2NRegion.DumpB2NResponse; + import com.gemstone.gemfire.internal.cache.partitioned.DumpBucketsMessage; + import com.gemstone.gemfire.internal.cache.partitioned.FetchBulkEntriesMessage; + import com.gemstone.gemfire.internal.cache.partitioned.FetchBulkEntriesMessage.FetchBulkEntriesResponse; + import com.gemstone.gemfire.internal.cache.partitioned.FetchEntriesMessage; + import com.gemstone.gemfire.internal.cache.partitioned.FetchEntriesMessage.FetchEntriesResponse; + import com.gemstone.gemfire.internal.cache.partitioned.FetchEntryMessage; + import com.gemstone.gemfire.internal.cache.partitioned.FetchEntryMessage.FetchEntryResponse; + import com.gemstone.gemfire.internal.cache.partitioned.FetchKeysMessage; + import com.gemstone.gemfire.internal.cache.partitioned.FetchKeysMessage.FetchKeysResponse; + import com.gemstone.gemfire.internal.cache.partitioned.GetMessage; + import com.gemstone.gemfire.internal.cache.partitioned.GetMessage.GetResponse; + import com.gemstone.gemfire.internal.cache.partitioned.IdentityRequestMessage; + import com.gemstone.gemfire.internal.cache.partitioned.IdentityRequestMessage.IdentityResponse; + import com.gemstone.gemfire.internal.cache.partitioned.IdentityUpdateMessage; + import com.gemstone.gemfire.internal.cache.partitioned.IdentityUpdateMessage.IdentityUpdateResponse; + import com.gemstone.gemfire.internal.cache.partitioned.IndexCreationMsg; + import com.gemstone.gemfire.internal.cache.partitioned.InterestEventMessage; + import com.gemstone.gemfire.internal.cache.partitioned.InterestEventMessage.InterestEventResponse; + import com.gemstone.gemfire.internal.cache.partitioned.InvalidateMessage; + import com.gemstone.gemfire.internal.cache.partitioned.InvalidateMessage.InvalidateResponse; + import com.gemstone.gemfire.internal.cache.partitioned.PREntriesIterator; + import com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException; + import com.gemstone.gemfire.internal.cache.partitioned.PRSanityCheckMessage; + import com.gemstone.gemfire.internal.cache.partitioned.PRUpdateEntryVersionMessage; + import com.gemstone.gemfire.internal.cache.partitioned.PRUpdateEntryVersionMessage.UpdateEntryVersionResponse; + import com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage.PartitionResponse; + import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionObserver; + import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionObserverHolder; + import com.gemstone.gemfire.internal.cache.partitioned.PutAllPRMessage; + import com.gemstone.gemfire.internal.cache.partitioned.PutMessage; + import com.gemstone.gemfire.internal.cache.partitioned.PutMessage.PutResult; + import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor; + import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.BucketVisitor; + import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.PartitionProfile; + import com.gemstone.gemfire.internal.cache.partitioned.RemoveAllPRMessage; + import com.gemstone.gemfire.internal.cache.partitioned.RemoveIndexesMessage; + import com.gemstone.gemfire.internal.cache.partitioned.SizeMessage; + import com.gemstone.gemfire.internal.cache.partitioned.SizeMessage.SizeResponse; + import com.gemstone.gemfire.internal.cache.persistence.PRPersistentConfig; + import com.gemstone.gemfire.internal.cache.tier.InterestType; + import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand; + import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; + import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection; + import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList; + import com.gemstone.gemfire.internal.cache.tier.sockets.command.Get70; + import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException; + import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector; + import com.gemstone.gemfire.internal.cache.versions.VersionStamp; + import com.gemstone.gemfire.internal.cache.versions.VersionTag; + import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; + import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor; + import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException; + import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException; + import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; + import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue; + import com.gemstone.gemfire.internal.i18n.LocalizedStrings; + import com.gemstone.gemfire.internal.logging.LogService; + import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; + import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; + import com.gemstone.gemfire.internal.logging.log4j.LogMarker; -import com.gemstone.gemfire.internal.offheap.Chunk; ++import com.gemstone.gemfire.internal.offheap.ObjectChunk; + import com.gemstone.gemfire.internal.offheap.annotations.Unretained; + import com.gemstone.gemfire.internal.sequencelog.RegionLogger; + import com.gemstone.gemfire.internal.util.TransformUtils; + import com.gemstone.gemfire.internal.util.concurrent.FutureResult; + import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch; + import com.gemstone.gemfire.i18n.StringId; + + /** + * A Region whose total storage is split into chunks of data (partitions) which + * are copied up to a configurable level (for high availability) and placed on + * multiple VMs for improved performance and increased storage capacity. + * + */ + public class PartitionedRegion extends LocalRegion implements + CacheDistributionAdvisee, QueryExecutor { + + public static final Random rand = new Random(Long.getLong( + "gemfire.PartitionedRegionRandomSeed", NanoTimer.getTime()).longValue()); + + private static final AtomicInteger SERIAL_NUMBER_GENERATOR = new AtomicInteger(); + + private final DiskRegionStats diskRegionStats; + /** + * Changes scope of replication to secondary bucket to SCOPE.DISTRIBUTED_NO_ACK + */ + public static final boolean DISABLE_SECONDARY_BUCKET_ACK = Boolean.getBoolean( + "gemfire.disablePartitionedRegionBucketAck"); + + /** + * A debug flag used for testing calculation of starting bucket id + */ + public static boolean BEFORE_CALCULATE_STARTING_BUCKET_FLAG = false; + + /** + * Thread specific random number + */ + private static ThreadLocal threadRandom = new ThreadLocal() { + @Override + protected Object initialValue() { + int i = rand.nextInt(); + if (i < 0) { + i = -1 * i; + } + return Integer.valueOf(i); + } + }; + + /** + * Global Region for storing PR config ( PRName->PRConfig). This region would + * be used to resolve PR name conflict.* + */ + private volatile Region prRoot; + + /** + * + * PartitionedRegionDataStore class takes care of data storage for the PR. + * This will contain the bucket Regions to store data entries for PR* + */ + protected PartitionedRegionDataStore dataStore; + + /** + * The advisor that hold information about this partitioned region + */ + private final RegionAdvisor distAdvisor; + + /** Logging mechanism for debugging */ + private static final Logger logger = LogService.getLogger(); + + /** cleanup flags * */ + private boolean cleanPRRegistration = false; + + /** Time to wait for for acquiring distributed lock ownership */ + final static long VM_OWNERSHIP_WAIT_TIME = PRSystemPropertyGetter + .parseLong( + System + .getProperty(PartitionedRegionHelper.VM_OWNERSHIP_WAIT_TIME_PROPERTY), + PartitionedRegionHelper.VM_OWNERSHIP_WAIT_TIME_DEFAULT); + + /** + * default redundancy level is 0. + */ + final int redundantCopies; + + /** + * The miminum amount of redundancy needed for a write operation + */ + final int minimumWriteRedundancy; + + /** + * The miminum amount of redundancy needed for a read operation + */ + final int minimumReadRedundancy; + + /** + * Ratio of currently allocated memory to maxMemory that triggers rebalance + * activity. + */ + final static float rebalanceThreshold = 0.75f; + + /** The maximum memory allocated for this node in Mb */ + final int localMaxMemory; + + /** The maximum milliseconds for retrying operations */ + final private int retryTimeout; + + /** + * The statistics for this PR + */ + public final PartitionedRegionStats prStats; + + // private Random random = new Random(System.currentTimeMillis()); + + /** Number of initial buckets */ + private final int totalNumberOfBuckets; + + /** + * To check if local cache is enabled. + */ + private static final boolean localCacheEnabled = false; + + // private static final boolean throwIfNoNodesLeft = true; + + public static final int DEFAULT_RETRY_ITERATIONS = 3; + + /** + * Flag to indicate if a cache loader is present + */ + private volatile boolean haveCacheLoader; + + /** + * Region identifier used for DLocks (Bucket and Region) + */ + private final String regionIdentifier; + + /** + * Maps each PR to a prId. This prId will uniquely identify the PR. + */ + static final PRIdMap prIdToPR = new PRIdMap(); + + /** + * Flag to indicate whether region is closed + * + */ + public volatile boolean isClosed = false; + + /** + * a flag indicating that the PR is destroyed in this VM + */ + public volatile boolean isLocallyDestroyed = false; + + /** + * the thread locally destroying this pr. not volatile, + * so always check isLocallyDestroyed before checking locallyDestroyingThread + * + * Concurrency: {@link #isLocallyDestroyed} is volatile + */ + public Thread locallyDestroyingThread; + + // TODO someone please add a javadoc for this + private volatile boolean hasPartitionedIndex = false; + + /** + * regionMembershipListener notification requires this to be plugged into + * a PR's RegionAdvisor + */ + private final AdvisorListener advisorListener = new AdvisorListener(); + + /* + * Map containing or Index>. + * IndexTask represents an index thats completely created or + * one thats in create phase. This is done in order to avoid + * synchronization on the indexes. + */ + private final ConcurrentMap indexes = new ConcurrentHashMap(); + + private volatile boolean recoveredFromDisk; + + public static final int RUNNING_MODE = -1; + public static final int PRIMARY_BUCKETS_LOCKED = 1; + public static final int DISK_STORE_FLUSHED = 2; + public static final int OFFLINE_EQUAL_PERSISTED = 3; + + private volatile int shutDownAllStatus = RUNNING_MODE; + + private final long birthTime = System.currentTimeMillis(); + + public void setShutDownAllStatus(int newStatus) { + this.shutDownAllStatus = newStatus; + } + + private final PartitionedRegion colocatedWithRegion; + + private List sortedBuckets; + + private ScheduledExecutorService bucketSorter; + + private ConcurrentMap partitionsMap = new ConcurrentHashMap(); + + public ConcurrentMap getPartitionsMap() { + return this.partitionsMap; + } + /** + * for wan shadowPR + */ + private boolean enableConflation; + + private final Object indexLock = new Object(); + + /** + * Byte 0 = no NWHOP Byte 1 = NWHOP to servers in same server-grp Byte 2 = + * NWHOP tp servers in other server-grp + */ + private final ThreadLocal isNetworkHop = new ThreadLocal() { + @Override + protected Byte initialValue() { + return Byte.valueOf((byte)0); + } + }; + + public void setIsNetworkHop(Byte value) { + this.isNetworkHop.set(value); + } + + public Byte isNetworkHop() { + return this.isNetworkHop.get(); + } + + private final ThreadLocal metadataVersion = new ThreadLocal() { + @Override + protected Byte initialValue() { + return 0; + } + }; + + public void setMetadataVersion(Byte value) { + this.metadataVersion.set(value); + } + + public Byte getMetadataVersion() { + return this.metadataVersion.get(); + } + + + /** + * Returns the LRUStatistics for this PR. + * This is needed to find the single instance of LRUStatistics + * created early for a PR when it is recovered from disk. + * This fixes bug 41938 + */ + public LRUStatistics getPRLRUStatsDuringInitialization() { + LRUStatistics result = null; + if (getDiskStore() != null) { + result = getDiskStore().getPRLRUStats(this); + } + return result; + } + + + ////////////////// ConcurrentMap methods ////////////////// + + @Override + public boolean remove(Object key, Object value, Object callbackArg) { + final long startTime = PartitionedRegionStats.startTime(); + try { + return super.remove(key, value, callbackArg); + } + finally { + this.prStats.endDestroy(startTime); + } + } + + + + ////////////////// End of ConcurrentMap methods ////////////////// + + + public PartitionListener[] getPartitionListeners() { + return this.partitionListeners; + } + + /** + * Return canonical representation for a bucket (for logging) + * + * @param bucketId + * the bucket + * @return a String representing this PR and the bucket + */ + public String bucketStringForLogs(int bucketId) { + return getPRId() + BUCKET_ID_SEPARATOR + bucketId; + } + + /** Separator between PRId and bucketId for creating bucketString */ + public static final String BUCKET_ID_SEPARATOR = ":"; + + /** + * Clear the prIdMap, typically used when disconnecting from the distributed + * system or clearing the cache + */ + public static void clearPRIdMap() { + synchronized (prIdToPR) { + prIdToPR.clear(); + } + } + + private static DisconnectListener dsPRIdCleanUpListener = new DisconnectListener() { + @Override + public String toString() { + return LocalizedStrings.PartitionedRegion_SHUTDOWN_LISTENER_FOR_PARTITIONEDREGION.toLocalizedString(); + } + + public void onDisconnect(InternalDistributedSystem sys) { + clearPRIdMap(); + } + }; + + + public static class PRIdMap extends HashMap { + private static final long serialVersionUID = 3667357372967498179L; + public final static String DESTROYED = "Partitioned Region Destroyed"; + + final static String LOCALLY_DESTROYED = "Partitioned Region Is Locally Destroyed"; + + final static String FAILED_REGISTRATION = "Partitioned Region's Registration Failed"; + + public final static String NO_PATH_FOUND = "NoPathFound"; + + private volatile boolean cleared = true; + + @Override + public Object get(Object key) { + throw new UnsupportedOperationException(LocalizedStrings.PartitionedRegion_PRIDMAPGET_NOT_SUPPORTED_USE_GETREGION_INSTEAD.toLocalizedString()); + } + + public Object getRegion(Object key) throws PRLocallyDestroyedException { + if (cleared) { + Cache c = GemFireCacheImpl.getInstance(); + if (c == null) { + throw new CacheClosedException(); + } + else { + c.getCancelCriterion().checkCancelInProgress(null); + } + } + Assert.assertTrue(key instanceof Integer); + + Object o = super.get(key); + if (o == DESTROYED) { + throw new RegionDestroyedException(LocalizedStrings.PartitionedRegion_REGION_FOR_PRID_0_IS_DESTROYED.toLocalizedString(key), NO_PATH_FOUND); + } + if (o == LOCALLY_DESTROYED) { + throw new PRLocallyDestroyedException(LocalizedStrings.PartitionedRegion_REGION_WITH_PRID_0_IS_LOCALLY_DESTROYED_ON_THIS_NODE.toLocalizedString(key)); + } + if (o == FAILED_REGISTRATION) { + throw new PRLocallyDestroyedException(LocalizedStrings.PartitionedRegion_REGION_WITH_PRID_0_FAILED_INITIALIZATION_ON_THIS_NODE.toLocalizedString(key)); + } + return o; + } + + @Override + public Object remove(final Object key) { + return put(key, DESTROYED, true); + } + + @Override + public Object put(final Object key, final Object value) { + return put(key, value, true); + } + + public Object put(final Object key, final Object value, + boolean sendIdentityRequestMessage) { + if (cleared) { + cleared = false; + } + + if (key == null) { + throw new NullPointerException(LocalizedStrings.PartitionedRegion_NULL_KEY_NOT_ALLOWED_FOR_PRIDTOPR_MAP.toLocalizedString()); + } + if (value == null) { + throw new NullPointerException(LocalizedStrings.PartitionedRegion_NULL_VALUE_NOT_ALLOWED_FOR_PRIDTOPR_MAP.toLocalizedString()); + } + Assert.assertTrue(key instanceof Integer); + if (sendIdentityRequestMessage) + IdentityRequestMessage.setLatestId(((Integer)key).intValue()); + if ((super.get(key) == DESTROYED) && (value instanceof PartitionedRegion)) { + PartitionedRegionException pre = new PartitionedRegionException(LocalizedStrings.PartitionedRegion_CAN_NOT_REUSE_OLD_PARTITIONED_REGION_ID_0.toLocalizedString(key)); + throw pre; + } + return super.put(key, value); + } + + @Override + public void clear() { + this.cleared = true; + super.clear(); + } + + public synchronized String dump() { + StringBuffer b = new StringBuffer("prIdToPR Map@"); + b.append(System.identityHashCode(prIdToPR)).append(":\n"); + Map.Entry me; + for (Iterator i = prIdToPR.entrySet().iterator(); i.hasNext();) { + me = (Map.Entry)i.next(); + b.append(me.getKey()).append("=>").append(me.getValue()); + if (i.hasNext()) { + b.append("\n"); + } + } + return b.toString(); + } + } + + private int partitionedRegionId = -3; + + // final private Scope userScope; + + /** Node description */ + final private Node node; + + /** Helper Object for redundancy Management of PartitionedRegion */ + private final PRHARedundancyProvider redundancyProvider; + + /** + * flag saying whether this VM needs cache operation notifications from other + * members + */ + private boolean requiresNotification; + + /** + * Latch that signals when the Bucket meta-data is ready to receive updates + */ + private final StoppableCountDownLatch initializationLatchAfterBucketIntialization; + + /** + * Constructor for a PartitionedRegion. This has an accessor (Region API) + * functionality and contains a datastore for actual storage. An accessor can + * act as a local cache by having a local storage enabled. A PartitionedRegion + * can be created by a factory method of RegionFactory.java and also by + * invoking Cache.createRegion(). (Cache.xml etc to be added) + * + */ + + static public final String RETRY_TIMEOUT_PROPERTY = + "gemfire.partitionedRegionRetryTimeout"; + + private final PartitionRegionConfigValidator validator ; + + final List fixedPAttrs; + + private byte fixedPASet; + + public List colocatedByList= new CopyOnWriteArrayList(); + + private final PartitionListener[] partitionListeners; + + private boolean isShadowPR = false; + private boolean isShadowPRForHDFS = false; + + private AbstractGatewaySender parallelGatewaySender = null; + + private final ThreadLocal queryHDFS = new ThreadLocal() { + @Override + protected Boolean initialValue() { + return false; + } + }; + + public PartitionedRegion(String regionname, RegionAttributes ra, + LocalRegion parentRegion, GemFireCacheImpl cache, + InternalRegionArguments internalRegionArgs) { + super(regionname, ra, parentRegion, cache, internalRegionArgs); + + this.node = initializeNode(); + this.prStats = new PartitionedRegionStats(cache.getDistributedSystem(), getFullPath()); + this.regionIdentifier = getFullPath().replace('/', '#'); + + if (logger.isDebugEnabled()) { + logger.debug("Constructing Partitioned Region {}", regionname); + } + + // By adding this disconnect listener we ensure that the pridmap is cleaned + // up upon + // distributed system disconnect even this (or other) PRs are destroyed + // (which prevents pridmap cleanup). + cache.getDistributedSystem().addDisconnectListener(dsPRIdCleanUpListener); + + // add an async queue for the region if the store name is not null. + if (this.getHDFSStoreName() != null) { + String eventQueueName = getHDFSEventQueueName(); + super.addAsyncEventQueueId(eventQueueName); + } + + // this.userScope = ra.getScope(); + this.partitionAttributes = ra.getPartitionAttributes(); + this.localMaxMemory = this.partitionAttributes.getLocalMaxMemory(); + this.retryTimeout = Integer.getInteger(RETRY_TIMEOUT_PROPERTY, + PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION).intValue(); + this.totalNumberOfBuckets = this.partitionAttributes.getTotalNumBuckets(); + this.prStats.incTotalNumBuckets(this.totalNumberOfBuckets); + this.distAdvisor = RegionAdvisor.createRegionAdvisor(this); // Warning: potential early escape of instance + this.redundancyProvider = new PRHARedundancyProvider(this); // Warning: + // potential + // early escape + // instance + + // localCacheEnabled = ra.getPartitionAttributes().isLocalCacheEnabled(); + // This is to make sure that local-cache get and put works properly. + // getScope is overridden to return the correct scope. + // this.scope = Scope.LOCAL; + this.redundantCopies = ra.getPartitionAttributes().getRedundantCopies(); + this.prStats.setConfiguredRedundantCopies(ra.getPartitionAttributes().getRedundantCopies()); + this.prStats.setLocalMaxMemory(ra.getPartitionAttributes().getLocalMaxMemory() * 1024L * 1024); + + // No redundancy required for writes + this.minimumWriteRedundancy = Integer.getInteger( + "gemfire.mimimumPartitionedRegionWriteRedundancy", 0).intValue(); + // No redundancy required for reads + this.minimumReadRedundancy = Integer.getInteger( + "gemfire.mimimumPartitionedRegionReadRedundancy", 0).intValue(); + + this.haveCacheLoader = ra.getCacheLoader() != null; + + this.initializationLatchAfterBucketIntialization = new StoppableCountDownLatch( + this.getCancelCriterion(), 1); + + this.validator = new PartitionRegionConfigValidator(this); + this.partitionListeners = this.partitionAttributes.getPartitionListeners(); + + this.colocatedWithRegion = ColocationHelper.getColocatedRegion(this); + if (colocatedWithRegion != null) { + //In colocation chain, child region inherita the fixed partitin attributes from parent region. + this.fixedPAttrs = colocatedWithRegion.getFixedPartitionAttributesImpl(); + this.fixedPASet = colocatedWithRegion.fixedPASet; + synchronized (colocatedWithRegion.colocatedByList) { + colocatedWithRegion.colocatedByList.add(this); + } + } + else { + this.fixedPAttrs = this.partitionAttributes.getFixedPartitionAttributes(); + this.fixedPASet = 0; + } + if (logger.isDebugEnabled()) { + logger.debug("Partitioned Region {} constructed {}", regionname, (this.haveCacheLoader ? "with a cache loader" : "")); + } + if (this.getEvictionAttributes() != null + && this.getEvictionAttributes().getAlgorithm().isLRUHeap()) { + this.sortedBuckets = new ArrayList(); + final ThreadGroup grp = LoggingThreadGroup.createThreadGroup("BucketSorterThread", logger); + ThreadFactory tf = new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread t = new Thread(grp, r, "BucketSorterThread"); + t.setDaemon(true); + return t; + } + }; + this.bucketSorter = Executors.newScheduledThreadPool(1, tf); + } + // If eviction is on, Create an instance of PartitionedRegionLRUStatistics + if ((this.getEvictionAttributes() != null + && !this.getEvictionAttributes().getAlgorithm().isNone() + && this.getEvictionAttributes().getAction().isOverflowToDisk()) + || this.getDataPolicy().withPersistence()) { + StatisticsFactory sf = this.getCache().getDistributedSystem(); + this.diskRegionStats = new DiskRegionStats(sf, getFullPath()); + } else { + this.diskRegionStats = null; + } + if (internalRegionArgs.isUsedForParallelGatewaySenderQueue()) { + this.isShadowPR = true; + this.parallelGatewaySender = internalRegionArgs.getParallelGatewaySender(); + if (internalRegionArgs.isUsedForHDFSParallelGatewaySenderQueue()) + this.isShadowPRForHDFS = true; + } + + + /* + * Start persistent profile logging if we are a persistent region. + */ + if(dataPolicy.withPersistence()) { + startPersistenceProfileLogging(); + } + } + + /** + * Monitors when other members that participate in this persistent region are removed and creates + * a log entry marking the event. + */ + private void startPersistenceProfileLogging() { + this.distAdvisor.addProfileChangeListener(new ProfileListener() { + @Override + public void profileCreated(Profile profile) { + } + + @Override + public void profileUpdated(Profile profile) { + } + + @Override + public void profileRemoved(Profile profile, boolean destroyed) { + /* + * Don't bother logging membership activity if our region isn't ready. + */ + if(isInitialized()) { + CacheProfile cacheProfile = ((profile instanceof CacheProfile) ? (CacheProfile) profile : null); + Set onlineMembers = new HashSet(); + + TransformUtils.transform(PartitionedRegion.this.distAdvisor.advisePersistentMembers().values(),onlineMembers,TransformUtils.persistentMemberIdToLogEntryTransformer); + + logger.info(LocalizedMessage.create(LocalizedStrings.PersistenceAdvisorImpl_PERSISTENT_VIEW, + new Object[] {PartitionedRegion.this.getName(),TransformUtils.persistentMemberIdToLogEntryTransformer.transform(cacheProfile.persistentID),onlineMembers})); + } + } + }); + } + + @Override + public final boolean isHDFSRegion() { + return this.getHDFSStoreName() != null; + } + + @Override + public final boolean isHDFSReadWriteRegion() { + return isHDFSRegion() && !getHDFSWriteOnly(); + } + + @Override + protected final boolean isHDFSWriteOnly() { + return isHDFSRegion() && getHDFSWriteOnly(); + } + + public final void setQueryHDFS(boolean includeHDFS) { + queryHDFS.set(includeHDFS); + } + + @Override + public final boolean includeHDFSResults() { + return queryHDFS.get(); + } + + public final boolean isShadowPR() { + return isShadowPR; + } + + public final boolean isShadowPRForHDFS() { + return isShadowPRForHDFS; + } + + public AbstractGatewaySender getParallelGatewaySender() { + return parallelGatewaySender; + } + + public Set getParallelGatewaySenderIds() { + Set regionGatewaySenderIds = this.getAllGatewaySenderIds(); + if (regionGatewaySenderIds.isEmpty()) { + return Collections.EMPTY_SET; + } + Set cacheGatewaySenders = getCache().getAllGatewaySenders(); + Set parallelGatewaySenderIds = new HashSet(); + for (GatewaySender sender : cacheGatewaySenders) { + if (regionGatewaySenderIds.contains(sender.getId()) + && sender.isParallel()) { + parallelGatewaySenderIds.add(sender.getId()); + } + } + return parallelGatewaySenderIds; + } + + List getColocatedByList() { + return this.colocatedByList; + } + + public boolean isColocatedBy() { + return !this.colocatedByList.isEmpty(); + } + + private void createAndValidatePersistentConfig() { + DiskStoreImpl dsi = this.getDiskStore(); + if (this.dataPolicy.withPersistence() && !this.concurrencyChecksEnabled + && supportsConcurrencyChecks()) { + logger.info(LocalizedMessage.create(LocalizedStrings.PartitionedRegion_ENABLING_CONCURRENCY_CHECKS_FOR_PERSISTENT_PR, this.getFullPath())); + this.concurrencyChecksEnabled = true; + } + if (dsi != null && this.getDataPolicy().withPersistence()) { + String colocatedWith = colocatedWithRegion == null + ? "" : colocatedWithRegion.getFullPath(); + PRPersistentConfig config = dsi.getPersistentPRConfig(this.getFullPath()); + if(config != null) { + if (config.getTotalNumBuckets() != this.getTotalNumberOfBuckets()) { + Object[] prms = new Object[] { this.getFullPath(), this.getTotalNumberOfBuckets(), + config.getTotalNumBuckets() }; + IllegalStateException ise = new IllegalStateException( + LocalizedStrings.PartitionedRegion_FOR_REGION_0_TotalBucketNum_1_SHOULD_NOT_BE_CHANGED_Previous_Configured_2.toString(prms)); + throw ise; + } + //Make sure we don't change to be colocated with a different region + //We also can't change from colocated to not colocated without writing + //a record to disk, so we won't allow that right now either. + if (!colocatedWith.equals(config.getColocatedWith())) { + Object[] prms = new Object[] { this.getFullPath(), colocatedWith, + config.getColocatedWith() }; + DiskAccessException dae = new DiskAccessException(LocalizedStrings.LocalRegion_A_DISKACCESSEXCEPTION_HAS_OCCURED_WHILE_WRITING_TO_THE_DISK_FOR_REGION_0_THE_REGION_WILL_BE_CLOSED.toLocalizedString(this.getFullPath()), null, dsi); + dsi.handleDiskAccessException(dae); + IllegalStateException ise = new IllegalStateException( + LocalizedStrings.PartitionedRegion_FOR_REGION_0_ColocatedWith_1_SHOULD_NOT_BE_CHANGED_Previous_Configured_2.toString(prms)); + throw ise; + } + } else { + + config= new PRPersistentConfig(this.getTotalNumberOfBuckets(), + colocatedWith); + dsi.addPersistentPR(this.getFullPath(), config); + //Fix for support issue 7870 - the parent region needs to be able + //to discover that there is a persistent colocated child region. So + //if this is a child region, persist its config to the parent disk store + //as well. + if(colocatedWithRegion != null + && colocatedWithRegion.getDiskStore() != null + && colocatedWithRegion.getDiskStore() != dsi) { + colocatedWithRegion.getDiskStore().addPersistentPR(this.getFullPath(), config); + } + } + + } + } + + /** + * Initializes the PartitionedRegion meta data, adding this Node and starting + * the service on this node (if not already started). + * Made this synchronized for bug 41982 + * @return true if initialize was done; false if not because it was destroyed + */ + private synchronized boolean initPRInternals(InternalRegionArguments internalRegionArgs) { + + if (this.isLocallyDestroyed) { + // don't initialize if we are already destroyed for bug 41982 + return false; + } + /* Initialize the PartitionRegion */ + if (cache.isCacheAtShutdownAll()) { + throw new CacheClosedException("Cache is shutting down"); + } + validator.validateColocation(); + + //Do this after the validation, to avoid creating a persistent config + //for an invalid PR. + createAndValidatePersistentConfig(); + initializePartitionedRegion(); + + /* set the total number of buckets */ + // setTotalNumOfBuckets(); + // If localMaxMemory is set to 0, do not initialize Data Store. + final boolean storesData = this.localMaxMemory > 0; + if (storesData) { + initializeDataStore(this.getAttributes()); + } + + // register this PartitionedRegion, Create a PartitionRegionConfig and bind + // it into the allPartitionedRegion system wide Region. + // IMPORTANT: do this before advising peers that we have this region + registerPartitionedRegion(storesData); + + getRegionAdvisor().initializeRegionAdvisor(); // must be BEFORE initializeRegion call + getRegionAdvisor().addMembershipListener(this.advisorListener); // fix for bug 38719 + + // 3rd part of eviction attributes validation, after eviction attributes + // have potentially been published (by the first VM) but before buckets are created + validator.validateEvictionAttributesAgainstLocalMaxMemory(); + validator.validateFixedPartitionAttributes(); + + // Register with the other Nodes that have this region defined, this + // allows for an Advisor profile exchange, also notifies the Admin + // callbacks that this Region is created. + try { + new CreateRegionProcessor(this).initializeRegion(); + } catch (IllegalStateException e) { + // If this is a PARTITION_PROXY then retry region creation + // after toggling the concurrencyChecksEnabled flag. This is + // required because for persistent regions, we enforce concurrencyChecks + if (!this.isDataStore() && supportsConcurrencyChecks()) { + this.concurrencyChecksEnabled = !this.concurrencyChecksEnabled; + new CreateRegionProcessor(this).initializeRegion(); + } else { + throw e; + } + } + + if (!this.isDestroyed && !this.isLocallyDestroyed) { + // Register at this point so that other members are known + this.cache.getResourceManager().addResourceListener(ResourceType.MEMORY, this); + } + + // Create OQL indexes before starting GII. + createOQLIndexes(internalRegionArgs); + + // if any other services are dependent on notifications from this region, + // then we need to make sure that in-process ops are distributed before + // releasing the GII latches + if (this.isAllEvents()) { + StateFlushOperation sfo = new StateFlushOperation(getDistributionManager()); + try { + sfo.flush(this.distAdvisor.adviseAllPRNodes(), + getDistributionManager().getId(), + DistributionManager.HIGH_PRIORITY_EXECUTOR, false); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + getCancelCriterion().checkCancelInProgress(ie); + } + } + + releaseBeforeGetInitialImageLatch(); // moved to this spot for bug 36671 + + // requires prid assignment mthomas 4/3/2007 + getRegionAdvisor().processProfilesQueuedDuringInitialization(); + + releaseAfterBucketMetadataSetupLatch(); + + try { + if(storesData) { + if(this.redundancyProvider.recoverPersistentBuckets()) { + //Mark members as recovered from disk recursively, starting + //with the leader region. + PartitionedRegion leaderRegion = ColocationHelper.getLeaderRegion(this); + markRecoveredRecursively(leaderRegion); + } + } + } + catch (RegionDestroyedException rde) { + // Do nothing. + if (logger.isDebugEnabled()) { + logger.debug("initPRInternals: failed due to exception", rde); + } + } + + releaseAfterGetInitialImageLatch(); + + try { + if(storesData) { + this.redundancyProvider.scheduleCreateMissingBuckets(); + + if (this.redundantCopies > 0) { + this.redundancyProvider.startRedundancyRecovery(); + } + } + } + catch (RegionDestroyedException rde) { + // Do nothing. + if (logger.isDebugEnabled()) { + logger.debug("initPRInternals: failed due to exception", rde); + } + } + + return true; + } + + private void markRecoveredRecursively(PartitionedRegion region) { + region.setRecoveredFromDisk(); + for(PartitionedRegion colocatedRegion : ColocationHelper.getColocatedChildRegions(region)) { + markRecoveredRecursively(colocatedRegion); + } + } + + @Override + protected void postCreateRegion() { + super.postCreateRegion(); + CacheListener[] listeners = fetchCacheListenersField(); + if (listeners != null && listeners.length > 0) { + Set others = getRegionAdvisor().adviseGeneric(); + for (int i = 0; i < listeners.length; i++) { + if (listeners[i] instanceof RegionMembershipListener) { + RegionMembershipListener rml = (RegionMembershipListener)listeners[i]; + try { + DistributedMember[] otherDms = new DistributedMember[others + .size()]; + others.toArray(otherDms); + rml.initialMembers(this, otherDms); + } + catch (VirtualMachineError err) { + SystemFailure.initiateFailure(err); + // If this ever returns, rethrow the error. We're poisoned + // now, so don't let this thread continue. + throw err; + } + catch (Throwable t) { + // Whenever you catch Error or Throwable, you must also + // catch VirtualMachineError (see above). However, there is + // _still_ a possibility that you are dealing with a cascading + // error condition, so you also need to check to see if the JVM + // is still usable: + SystemFailure.checkFailure(); + logger.error(LocalizedMessage.create(LocalizedStrings.DistributedRegion_EXCEPTION_OCCURRED_IN_REGIONMEMBERSHIPLISTENER), t); + } + } + } + } + + PartitionListener[] partitionListeners = this.getPartitionListeners(); + if (partitionListeners != null && partitionListeners.length != 0) { + for (int i = 0; i < partitionListeners.length; i++) { + PartitionListener listener = partitionListeners[i]; + if (listener != null) { + listener.afterRegionCreate(this); + } + } + } + + Set allGatewaySenderIds = getAllGatewaySenderIds(); + if (!allGatewaySenderIds.isEmpty()) { + for (GatewaySender sender : cache.getAllGatewaySenders()) { + if (sender.isParallel() + && allGatewaySenderIds.contains(sender.getId())) { + /** + * get the ParallelGatewaySender to create the colocated partitioned + * region for this region. + */ + if (sender.isRunning() ) { + AbstractGatewaySender senderImpl = (AbstractGatewaySender)sender; + ((ConcurrentParallelGatewaySenderQueue)senderImpl.getQueues().toArray(new RegionQueue[1])[0]) + .addShadowPartitionedRegionForUserPR(this); + } + } + } + } + } + + /* + * Initializes the PartitionedRegion. OVERRIDES + */ + @Override + protected void initialize(InputStream snapshotInputStream, + InternalDistributedMember imageTarget, + InternalRegionArguments internalRegionArgs) throws TimeoutException, + ClassNotFoundException { + if (logger.isDebugEnabled()) { + logger.debug("PartitionedRegion#initialize {}", getName()); + } + RegionLogger.logCreate(getName(), getDistributionManager().getDistributionManagerId()); + + this.requiresNotification = this.cache.requiresNotificationFromPR(this); + initPRInternals(internalRegionArgs); + + if (logger.isDebugEnabled()) { + logger.debug("PartitionRegion#initialize: finished with {}", this); + } + this.cache.addPartitionedRegion(this); + + } + + /** + * Initializes the Node for this Map. + */ + private Node initializeNode() { + return new Node(getDistributionManager().getId(), + SERIAL_NUMBER_GENERATOR.getAndIncrement()); + } + + /** + * receive notification that a bridge server or wan gateway has been created + * that requires notification of cache events from this region + */ + public void cacheRequiresNotification() { + if (!this.requiresNotification + && !(this.isClosed || this.isLocallyDestroyed)) { + // tell others of the change in status + this.requiresNotification = true; + new UpdateAttributesProcessor(this).distribute(false); + } + } + + @Override + void distributeUpdatedProfileOnHubCreation() + { + if (!(this.isClosed || this.isLocallyDestroyed)) { + // tell others of the change in status + this.requiresNotification = true; + new UpdateAttributesProcessor(this).distribute(false); + } + } + + @Override + void distributeUpdatedProfileOnSenderCreation() + { + if (!(this.isClosed || this.isLocallyDestroyed)) { + // tell others of the change in status + this.requiresNotification = true; + new UpdateAttributesProcessor(this).distribute(false); + } + } + + public void addGatewaySenderId(String gatewaySenderId) { + super.addGatewaySenderId(gatewaySenderId); + new UpdateAttributesProcessor(this).distribute(); + ((PartitionedRegion)this).distributeUpdatedProfileOnSenderCreation(); + GatewaySender sender = getCache().getGatewaySender(gatewaySenderId); + if (sender!= null && sender.isParallel() && sender.isRunning()) { + AbstractGatewaySender senderImpl = (AbstractGatewaySender)sender; + ((ConcurrentParallelGatewaySenderQueue)senderImpl.getQueues().toArray( + new RegionQueue[1])[0]).addShadowPartitionedRegionForUserPR(this); + } + } + + public void removeGatewaySenderId(String gatewaySenderId){ + super.removeGatewaySenderId(gatewaySenderId); + new UpdateAttributesProcessor(this).distribute(); + } + + public void addAsyncEventQueueId(String asyncEventQueueId) { + super.addAsyncEventQueueId(asyncEventQueueId); + new UpdateAttributesProcessor(this).distribute(); + ((PartitionedRegion)this).distributeUpdatedProfileOnSenderCreation(); + GatewaySender sender = getCache().getGatewaySender(AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncEventQueueId)); + if (sender!= null && sender.isParallel() && sender.isRunning()) { + AbstractGatewaySender senderImpl = (AbstractGatewaySender)sender; + ((ConcurrentParallelGatewaySenderQueue)senderImpl.getQueues().toArray( + new RegionQueue[1])[0]).addShadowPartitionedRegionForUserPR(this); + } + } + + public void removeAsyncEventQueueId(String asyncEventQueueId) { + super.removeAsyncEventQueueId(asyncEventQueueId); + new UpdateAttributesProcessor(this).distribute(); + } + + public void checkSameSenderIdsAvailableOnAllNodes() { + List senderIds = this.getCacheDistributionAdvisor() + .adviseSameGatewaySenderIds(getGatewaySenderIds()); + if (!senderIds.isEmpty()) { + throw new GatewaySenderConfigurationException( + LocalizedStrings.Region_REGION_0_HAS_1_GATEWAY_SENDER_IDS_ANOTHER_CACHE_HAS_THE_SAME_REGION_WITH_2_GATEWAY_SENDER_IDS_FOR_REGION_ACROSS_ALL_MEMBERS_IN_DS_GATEWAY_SENDER_IDS_SHOULD_BE_SAME + .toLocalizedString(new Object[] { this.getName(), + senderIds.get(0), senderIds.get(1) })); + } + + List asycnQueueIds = this.getCacheDistributionAdvisor() + .adviseSameAsyncEventQueueIds(getAsyncEventQueueIds()); + if (!asycnQueueIds.isEmpty()) { + throw new GatewaySenderConfigurationException( + LocalizedStrings.Region_REGION_0_HAS_1_ASYNC_EVENT_QUEUE_IDS_ANOTHER_CACHE_HAS_THE_SAME_REGION_WITH_2_ASYNC_EVENT_QUEUE_IDS_FOR_REGION_ACROSS_ALL_MEMBERS_IN_DS_ASYNC_EVENT_QUEUE_IDS_SHOULD_BE_SAME + .toLocalizedString(new Object[] { this.getName(), + asycnQueueIds.get(0), asycnQueueIds.get(1) })); + } + } + + /** + * Initializes the PartitionedRegion - create the Global regions for storing + * the PartitiotnedRegion configs. + */ + private void initializePartitionedRegion() { + this.prRoot = PartitionedRegionHelper.getPRRoot(getCache()); + } + + @Override + public void remoteRegionInitialized(CacheProfile profile) { + if (isInitialized() && hasListener()) { + Object callback = DistributedRegion.TEST_HOOK_ADD_PROFILE? profile : null; + RegionEventImpl event = new RegionEventImpl(PartitionedRegion.this, + Operation.REGION_CREATE, callback, true, profile.peerMemberId); + dispatchListenerEvent(EnumListenerEvent.AFTER_REMOTE_REGION_CREATE, + event); + } + } + + /** + * This method initializes the partitionedRegionDataStore for this PR. + * + * @param ra + * Region attributes + */ + private void initializeDataStore(RegionAttributes ra) { + + this.dataStore = PartitionedRegionDataStore.createDataStore(cache, this, ra + .getPartitionAttributes()); + } + + protected DistributedLockService getPartitionedRegionLockService() { + return getGemFireCache().getPartitionedRegionLockService(); + } + + /** + * Register this PartitionedRegion by: 1) Create a PartitionRegionConfig and + * 2) Bind it into the allPartitionedRegion system wide Region. + * + * @param storesData + * which indicates whether the instance in this cache stores + * data, effecting the Nodes PRType + * + * @see Node#setPRType(int) + */ + private void registerPartitionedRegion(boolean storesData) { + // Register this ParitionedRegion. First check if the ParitionedRegion + // entry already exists globally. + PartitionRegionConfig prConfig = null; + PartitionAttributes prAttribs = getAttributes().getPartitionAttributes(); + if (storesData) { + if (this.fixedPAttrs != null) { + this.node.setPRType(Node.FIXED_PR_DATASTORE); + } else { + this.node.setPRType(Node.ACCESSOR_DATASTORE); + } + this.node.setPersistence(getAttributes().getDataPolicy() == DataPolicy.PERSISTENT_PARTITION); + byte loaderByte = (byte)(getAttributes().getCacheLoader() != null ? 0x01 : 0x00); + byte writerByte = (byte)(getAttributes().getCacheWriter() != null ? 0x02 : 0x00); + this.node.setLoaderWriterByte((byte)(loaderByte + writerByte)); + } + else { + if (this.fixedPAttrs != null) { + this.node.setPRType(Node.FIXED_PR_ACCESSOR); + } else { + this.node.setPRType(Node.ACCESSOR); + } + } + final RegionLock rl = getRegionLock(); + try { + // if (!rl.lock()) { + if (logger.isDebugEnabled()) { + logger.debug("registerPartitionedRegion: obtaining lock"); + } + rl.lock(); + checkReadiness(); + + prConfig = this.prRoot.get(getRegionIdentifier()); + + if (prConfig == null) { + validateParalleGatewaySenderIds(); + this.partitionedRegionId = generatePRId(getSystem()); + prConfig = new PartitionRegionConfig(this.partitionedRegionId, + this.getFullPath(), prAttribs, this.getScope(), + getAttributes().getEvictionAttributes(), + getAttributes().getRegionIdleTimeout(), + getAttributes().getRegionTimeToLive(), + getAttributes().getEntryIdleTimeout(), + getAttributes().getEntryTimeToLive(), + this.getAllGatewaySenderIds()); + logger.info(LocalizedMessage.create(LocalizedStrings.PartitionedRegion_PARTITIONED_REGION_0_IS_BORN_WITH_PRID_1_IDENT_2, + new Object[] { getFullPath(), Integer.valueOf(this.partitionedRegionId), getRegionIdentifier()})); + + PRSanityCheckMessage.schedule(this); + } + else { + validator.validatePartitionAttrsFromPRConfig(prConfig); + if (storesData) { + validator.validatePersistentMatchBetweenDataStores(prConfig); + validator.validateCacheLoaderWriterBetweenDataStores(prConfig); + validator.validateFixedPABetweenDataStores(prConfig); + } + + this.partitionedRegionId = prConfig.getPRId(); + logger.info(LocalizedMessage.create(LocalizedStrings.PartitionedRegion_PARTITIONED_REGION_0_IS_CREATED_WITH_PRID_1, + new Object[] { getFullPath(), Integer.valueOf(this.partitionedRegionId)})); + } + + synchronized (prIdToPR) { + prIdToPR.put(Integer.valueOf(this.partitionedRegionId), this); // last + } + prConfig.addNode(this.node); + if (this.getFixedPartitionAttributesImpl() != null) { + calculateStartingBucketIDs(prConfig); + } + updatePRConfig(prConfig, false); + /* + * try { if (this.redundantCopies > 0) { if (storesData) { + * this.dataStore.grabBackupBuckets(false); } } } catch + * (RegionDestroyedException rde) { if (!this.isClosed) throw rde; } + */ + this.cleanPRRegistration = true; + } + catch (LockServiceDestroyedException lsde) { + if (logger.isDebugEnabled()) { + logger.debug("registerPartitionedRegion: unable to obtain lock for {}", this); + } + cleanupFailedInitialization(); + throw new PartitionedRegionException( + LocalizedStrings.PartitionedRegion_CAN_NOT_CREATE_PARTITIONEDREGION_FAILED_TO_ACQUIRE_REGIONLOCK + .toLocalizedString(), lsde); + } + catch (IllegalStateException ill) { + cleanupFailedInitialization(); + throw ill; + } + catch (VirtualMachineError err) { + SystemFailure.initiateFailure(err); + // If this ever returns, rethrow the error. We're poisoned + // now, so don't let this thread continue. + throw err; + } + catch (Throwable t) { + // Whenever you catch Error or Throwable, you must also + // catch VirtualMachineError (see above). However, there is + // _still_ a possibility that you are dealing with a cascading + // error condition, so you also need to check to see if the JVM + // is still usable: + SystemFailure.checkFailure(); + String registerErrMsg = + LocalizedStrings.PartitionedRegion_AN_EXCEPTION_WAS_CAUGHT_WHILE_REGISTERING_PARTITIONEDREGION_0_DUMPPRID_1 + .toLocalizedString(new Object[] {getFullPath(), prIdToPR.dump()}); + try { + synchronized (prIdToPR) { + if (prIdToPR.containsKey(Integer.valueOf(this.partitionedRegionId))) { + prIdToPR.put(Integer.valueOf(this.partitionedRegionId), + PRIdMap.FAILED_REGISTRATION, false); + logger.info(LocalizedMessage.create(LocalizedStrings.PartitionedRegion_FAILED_REGISTRATION_PRID_0_NAMED_1, + new Object[] {Integer.valueOf(this.partitionedRegionId), this.getName()})); + } + } + } + catch (VirtualMachineError err) { + SystemFailure.initiateFailure(err); + // If this ever returns, rethrow the error. We're poisoned + // now, so don't let this thread continue. + throw err; + } + catch (Throwable ignore) { + // Whenever you catch Error or Throwable, you must also + // catch VirtualMachineError (see above). However, there is + // _still_ a possibility that you are dealing with a cascading + // error condition, so you also need to check to see if the JVM + // is still usable: + SystemFailure.checkFailure(); + if (logger.isDebugEnabled()) { + logger.debug("Partitioned Region creation, could not clean up after caught exception", ignore); + } + } + throw new PartitionedRegionException(registerErrMsg, t); + } + finally { + try { + rl.unlock(); + if (logger.isDebugEnabled()) { + logger.debug("registerPartitionedRegion: released lock"); + } + } + catch (Exception es) { + if (logger.isDebugEnabled()) { + logger.warn(es.getMessage(), es); + } + } + } + } + + public void validateParalleGatewaySenderIds() throws PRLocallyDestroyedException{ + for (String senderId : this.getParallelGatewaySenderIds()) { + for (PartitionRegionConfig config : this.prRoot.values()) { + if (config.getGatewaySenderIds().contains(senderId)) { + Map colocationMap = ColocationHelper + .getAllColocationRegions(this); + if (!colocationMap.isEmpty()) { + if (colocationMap.containsKey(config.getFullPath())) { + continue; + } + else { + int prID = config.getPRId(); + PartitionedRegion colocatedPR = PartitionedRegion + .getPRFromId(prID); + PartitionedRegion leader = ColocationHelper + .getLeaderRegion(colocatedPR); + if (colocationMap.containsValue(leader)) { + continue; + } + else { + throw new IllegalStateException( + LocalizedStrings.PartitionRegion_NON_COLOCATED_REGIONS_1_2_CANNOT_HAVE_SAME_PARALLEL_GATEWAY_SENDER_ID_2.toString(new Object[] { + this.getFullPath(), + config.getFullPath(), + senderId.contains(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX) ? "async event queue": "gateway sender", + senderId })); + } + } + } + else { + throw new IllegalStateException( + LocalizedStrings.PartitionRegion_NON_COLOCATED_REGIONS_1_2_CANNOT_HAVE_SAME_PARALLEL_GATEWAY_SENDER_ID_2.toString(new Object[] { + this.getFullPath(), + config.getFullPath(), + senderId.contains(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX) ? "async event queue": "gateway sender", + senderId })); + } + + } + } + } + } + + /** + * @return whether this region requires event notification for all cache + * content changes from other nodes + */ + public boolean getRequiresNotification() { + return this.requiresNotification; + } + + /** + * Get the Partitioned Region identifier used for DLocks (Bucket and Region) + */ + final public String getRegionIdentifier() { + return this.regionIdentifier; + } + + void setRecoveredFromDisk() { + this.recoveredFromDisk = true; + new UpdateAttributesProcessor(this).distribute(false); + } + + public final void updatePRConfig(PartitionRegionConfig prConfig, + boolean putOnlyIfUpdated) { + final Set nodes = prConfig.getNodes(); + final PartitionedRegion colocatedRegion = ColocationHelper + .getColocatedRegion(this); + RegionLock colocatedLock = null; + boolean colocatedLockAcquired = false; + try { + boolean colocationComplete = false; + if (colocatedRegion != null && !prConfig.isColocationComplete() && + // if the current node is marked uninitialized (SQLF DDL replay in + // progress) then colocation will definitely not be marked complete so + // avoid taking the expensive region lock + !getCache().isUnInitializedMember(getDistributionManager().getId())) { + colocatedLock = colocatedRegion.getRegionLock(); + colocatedLock.lock(); + colocatedLockAcquired = true; + final PartitionRegionConfig parentConf = this.prRoot + .get(colocatedRegion.getRegionIdentifier()); + if (parentConf.isColocationComplete() + && parentConf.hasSameDataStoreMembers(prConfig)) { + colocationComplete = true; + // check if all the nodes have been initialized (SQLF bug #42089) + for (Node node : nodes) { + if (getCache().isUnInitializedMember(node.getMemberId())) { + colocationComplete = false; + break; + } + } + if (colocationComplete) { + prConfig.setColocationComplete(); + } + } + } + + if(isDataStore() && !prConfig.isFirstDataStoreCreated()) { + prConfig.setDatastoreCreated(getEvictionAttributes()); + } + // N.B.: this put could fail with a CacheClosedException: + if (!putOnlyIfUpdated || colocationComplete) { + this.prRoot.put(getRegionIdentifier(), prConfig); + } + } finally { + if (colocatedLockAcquired) { + colocatedLock.unlock(); + } + } + } + + /** + * + * @param keyInfo + * @param access + * true if caller wants last accessed time updated + * @param allowTombstones - whether a tombstone can be returned + * @return TODO + */ + @Override + protected Region.Entry nonTXGetEntry(KeyInfo keyInfo, boolean access, boolean allowTombstones) { + final long startTime = PartitionedRegionStats.startTime(); + final Object key = keyInfo.getKey(); + try { + int bucketId = keyInfo.getBucketId(); + if (bucketId == KeyInfo.UNKNOWN_BUCKET) { + bucketId = PartitionedRegionHelper.getHashKey(this, + Operation.GET_ENTRY, key, null, null); + keyInfo.setBucketId(bucketId); + } + InternalDistributedMember targetNode = getOrCreateNodeForBucketRead(bucketId); + return getEntryInBucket(targetNode, bucketId, key, access, allowTombstones); + } + finally { + this.prStats.endGetEntry(startTime); + } + } + + protected EntrySnapshot getEntryInBucket( + final DistributedMember targetNode, final int bucketId, + final Object key, boolean access, final boolean allowTombstones) { + final int retryAttempts = calcRetry(); + if (logger.isTraceEnabled()) { + logger.trace("getEntryInBucket: " + "Key key={} ({}) from: {} bucketId={}", + key, key.hashCode(), targetNode, bucketStringForLogs(bucketId)); + } + Integer bucketIdInt = Integer.valueOf(bucketId); + EntrySnapshot ret = null; + int count = 0; + RetryTimeKeeper retryTime = null; + InternalDistributedMember retryNode = (InternalDistributedMember)targetNode; + while (count <= retryAttempts) { + // Every continuation should check for DM cancellation + if (retryNode == null) { + checkReadiness(); + if (retryTime == null) { + retryTime = new RetryTimeKeeper(this.retryTimeout); + } + if (retryTime.overMaximum()) { + break; + } + retryNode = getOrCreateNodeForBucketRead(bucketId); + + // No storage found for bucket, early out preventing hot loop, bug 36819 + if (retryNode == null) { + checkShutdown(); + return null; + } + continue; + } + try { + final boolean loc = (this.localMaxMemory > 0) && retryNode.equals(getMyId()); + if (loc) { + ret = this.dataStore.getEntryLocally(bucketId, key, access, allowTombstones, true); + } else { + ret = getEntryRemotely(retryNode, bucketIdInt, key, access, allowTombstones); + // TODO:Suranjan&Yogesh : there should be better way than this one + String name = Thread.currentThread().getName(); + if (name.startsWith("ServerConnection") + && !getMyId().equals(targetNode)) { + setNetworkHop(bucketIdInt, (InternalDistributedMember)targetNode); + } + } + + return ret; + } + catch (PRLocallyDestroyedException pde) { + if (logger.isDebugEnabled()) { + logger.debug("getEntryInBucket: Encountered PRLocallyDestroyedException "); + } + checkReadiness(); + } + catch (EntryNotFoundException enfe) { + return null; + } + catch (ForceReattemptException prce) { + prce.checkKey(key); + if (logger.isDebugEnabled()) { + logger.debug("getEntryInBucket: retrying, attempts so far: {}", count, prce); + } + checkReadiness(); + InternalDistributedMember lastNode = retryNode; + retryNode = getOrCreateNodeForBucketRead(bucketIdInt.intValue()); + if (lastNode.equals(retryNode)) { + if (retryTime == null) { + retryTime = new RetryTimeKeeper(this.retryTimeout); + } + if (retryTime.overMaximum()) { + break; + } + retryTime.waitToRetryNode(); + } + } + catch (PrimaryBucketException notPrimary) { + if (logger.isDebugEnabled()) { + logger.debug("Bucket {} on Node {} not primary", notPrimary.getLocalizedMessage(), retryNode); + } + getRegionAdvisor().notPrimary(bucketIdInt.intValue(), retryNode); + retryNode = getOrCreateNodeForBucketRead(bucketIdInt.intValue()); + } + + // It's possible this is a GemFire thread e.g. ServerConnection + // which got to this point because of a distributed system shutdown or + // region closure which uses interrupt to break any sleep() or wait() + // calls + // e.g. waitForPrimary + checkShutdown(); + + count++; + if (count == 1) { + this.prStats.incContainsKeyValueOpsRetried(); + } + this.prStats.incContainsKeyValueRetries(); + + } + + PartitionedRegionDistributionException e = null; // Fix for bug 36014 + if (logger.isDebugEnabled()) { + e = new PartitionedRegionDistributionException(LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GETENTRY_IN_0_ATTEMPTS.toLocalizedString(Integer.valueOf(count))); + } + logger.warn(LocalizedMessage.create(LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GETENTRY_IN_0_ATTEMPTS, Integer.valueOf(count)), e); + return null; + } + + /** + * Check for region closure, region destruction, cache closure as well as + * distributed system disconnect. As of 6/21/2007, there were at least four + * volatile variables reads and one synchonrization performed upon completion + * of this method. + */ + private void checkShutdown() { + checkReadiness(); + this.cache.getCancelCriterion().checkCancelInProgress(null); + } + + /** + * Checks if a key is contained remotely. + * + * @param targetNode + * the node where bucket region for the key exists. + * @param bucketId + * the bucket id for the key. + * @param key + * the key, whose value needs to be checks + * @param access + * true if caller wants last access time updated + * @param allowTombstones whether tombstones should be returned + * @throws EntryNotFoundException + * if the entry doesn't exist + * @throws ForceReattemptException + * if the peer is no longer available + * @throws PrimaryBucketException + * @return true if the passed key is contained remotely. + */ + public EntrySnapshot getEntryRemotely( + InternalDistributedMember targetNode, + Integer bucketId, Object key, boolean access, boolean allowTombstones) + throws EntryNotFoundException, PrimaryBucketException, + ForceReattemptException { + FetchEntryResponse r = FetchEntryMessage + .send(targetNode, this, key, access); + this.prStats.incPartitionMessagesSent(); + EntrySnapshot entry = r.waitForResponse(); + if (entry != null && entry.getRawValue() == Token.TOMBSTONE){ + if (!allowTombstones) { + return null; + } + } + return entry; + } + + // ///////////////////////////////////////////////////////////////// + // Following methods would throw, operation Not Supported Exception + // ///////////////////////////////////////////////////////////////// + + /** + * @since 5.0 + * @throws UnsupportedOperationException + * OVERRIDES + */ + @Override + public void becomeLockGrantor() { + throw new UnsupportedOperationException(); + } + + /** + * @since 5.0 + * @throws UnsupportedOperationException + * OVERRIDES + */ + @Override + final public Region createSubregion(String subregionName, + RegionAttributes regionAttributes) throws RegionExistsException, + TimeoutException { + throw new UnsupportedOperationException(); + } + + /** + * @since 5.0 + * @throws UnsupportedOperationException + * OVERRIDES + */ + @Override + public Lock getDistributedLock(Object key) throws IllegalStateException { + throw new UnsupportedOperationException(); + } + + /** + * @since 5.0 + * @throws UnsupportedOperationException + * OVERRIDES + */ + @Override + public CacheStatistics getStatistics() { + throw new UnsupportedOperationException(); + } + + /** + * @since 5.0 + * @throws UnsupportedOperationException + * OVERRIDES + */ + public Region getSubregion() { + throw new UnsupportedOperationException(); + } + + /** + * @since 5.0 + * @throws UnsupportedOperationException + * OVERRIDES + */ + @Override + public Lock getRegionDistributedLock() throws IllegalStateException { + + throw new UnsupportedOperationException(); + } + + /** + * @since 5.0 + * @throws UnsupportedOperationException + * OVERRIDES + */ + @Override + public void loadSnapshot(InputStream inputStream) throws IOException, + ClassNotFoundException, CacheWriterException, TimeoutException { + throw new UnsupportedOperationException(); + } + + /** + * Should it destroy entry from local accessor????? + * OVERRIDES + */ + @Override + public void localDestroy(Object key, Object aCallbackArgument) + throws EntryNotFoundException { + + throw new UnsupportedOperationException(); + } + + /** + * @since 5.0 + * @throws UnsupportedOperationException + * OVERRIDES + */ + @Override + public void localInvalidate(Object key, Object aCallbackArgument) + throws EntryNotFoundException { + + throw new UnsupportedOperationException(); + } + + /** + * @since 5.0 + * @throws UnsupportedOperationException + * OVERRIDES + */ + @Override + public void localInvalidateRegion(Object aCallbackArgument) { + getDataView().checkSupportsRegionInvalidate(); + throw new UnsupportedOperationException(); + } + + /** + * Executes a query on this PartitionedRegion. The restrictions have already + * been checked. The query is a SELECT expression, and the only region it + * refers to is this region. + * + * @see DefaultQuery#execute() + * + * @since 5.1 + */ + public Object executeQuery(DefaultQuery query, Object[] parameters, + Set buckets) throws FunctionDomainException, TypeMismatchException, + NameResolutionException, QueryInvocationTargetException { + for (;;) { + try { + return doExecuteQuery(query, parameters, buckets); + } catch (ForceReattemptException fre) { + // fall through and loop + } + } + } + /** + * If ForceReattemptException is thrown then the caller must loop and call us again. + * @throws ForceReattemptException if one of the buckets moved out from under us + */ + private Object doExecuteQuery(DefaultQuery query, Object[] parameters, + Set buckets) + throws FunctionDomainException, TypeMismatchException, + NameResolutionException, QueryInvocationTargetException, + ForceReattemptException + { + if (logger.isDebugEnabled()) { + logger.debug("Executing query :{}", query); + } + + HashSet allBuckets = new HashSet(); + + if (buckets==null) { // remote buckets + final Iterator remoteIter = getRegionAdvisor().getBucketSet().iterator(); + try { + while (remoteIter.hasNext()) { + allBuckets.add((Integer)remoteIter.next()); + } + } + catch (NoSuchElementException stop) { + } + } + else { // local buckets + Iterator localIter = null; + if (this.dataStore != null) { + localIter = buckets.iterator(); + } + else { + localIter = Collections.EMPTY_SET.iterator(); + } + try { + while (localIter.hasNext()) { + allBuckets.add((Integer)localIter.next()); + } + } + catch (NoSuchElementException stop) { + } + } + + if (allBuckets.size() == 0) { + if (logger.isDebugEnabled()) { + logger.debug("No bucket storage allocated. PR has no data yet."); + } + ResultsSet resSet = new ResultsSet(); + resSet.setElementType(new ObjectTypeImpl( + this.getValueConstraint() == null ? Object.class : this + .getValueConstraint())); + return resSet; + } + + CompiledSelect selectExpr = query.getSimpleSelect(); + if (selectExpr == null) { + throw new IllegalArgumentException( + LocalizedStrings. + PartitionedRegion_QUERY_MUST_BE_A_SELECT_EXPRESSION_ONLY + .toLocalizedString()); + } + + // this can return a BAG even if it's a DISTINCT select expression, + // since the expectation is that the duplicates will be removed at the end + SelectResults results = selectExpr + .getEmptyResultSet(parameters, getCache(), query); + + PartitionedRegionQueryEvaluator prqe = new PartitionedRegionQueryEvaluator(this.getSystem(), this, query, + parameters, results, allBuckets); + for (;;) { + this.getCancelCriterion().checkCancelInProgress(null); + boolean interrupted = Thread.interrupted(); + try { + results = prqe.queryBuckets(null); + break; + } + catch (InterruptedException e) { + interrupted = true; + } + catch (FunctionDomainException e) { + throw e; + } + catch (TypeMismatchException e) { + throw e; + } + catch (NameResolutionException e) { + throw e; + } + catch (QueryInvocationTargetException e) { + throw e; + } + catch (QueryException qe) { + throw new QueryInvocationTargetException(LocalizedStrings.PartitionedRegion_UNEXPECTED_QUERY_EXCEPTION_OCCURED_DURING_QUERY_EXECUTION_0.toLocalizedString(qe.getMessage()), qe); + } + finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } // for + + // Drop Duplicates if this is a DISTINCT query + boolean allowsDuplicates = results.getCollectionType().allowsDuplicates(); + //Asif: No need to apply the limit to the SelectResults. + // We know that even if we do not apply the limit, + //the results will satisfy the limit + // as it has been evaluated in the iteration of List to + // populate the SelectsResuts + //So if the results is instance of ResultsBag or is a StructSet or + // a ResultsSet, if the limit exists, the data set size will + // be exactly matching the limit + if (selectExpr.isDistinct()) { + // don't just convert to a ResultsSet (or StructSet), since + // the bags can convert themselves to a Set more efficiently + ObjectType elementType = results.getCollectionType().getElementType(); + if (selectExpr.getOrderByAttrs() != null) { + // Set limit also, its not applied while building the final result set as order by is involved. + // results = new ResultsCollectionWrapper(elementType, results.asSet(), query.getLimit(parameters)); + } else if (allowsDuplicates) { + results = new ResultsCollectionWrapper(elementType, results.asSet()); + } + if (selectExpr.isCount() && (results.isEmpty() || selectExpr.isDistinct())) { + SelectResults resultCount = new ResultsBag(getCachePerfStats());//Constructor with elementType not visible. + resultCount.setElementType(new ObjectTypeImpl(Integer.class)); + ((ResultsBag)resultCount).addAndGetOccurence(results.size()); + return resultCount; + } + } + return results; + } + + /** + * @since 5.0 + * @throws UnsupportedOperationException + * OVERRIDES + */ + @Override + public void saveSnapshot(OutputStream outputStream) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * @since 5.0 + * @throws UnsupportedOperationException + * OVERRIDES + */ + @Override + public void writeToDisk() { + throw new UnsupportedOperationException(); + } + + /** + * @since 5.0 + * @throws UnsupportedOperationException + * OVERRIDES + */ + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + @Override + void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) { + throw new UnsupportedOperationException(); + } + + @Override + void basicLocalClear(RegionEventImpl event) { + throw new UnsupportedOperationException(); + } + + // ///////////////////////////////////////////////////////////////////// + // ////////////// Operation Supported for this release + // ////////////////////////////// + // ///////////////////////////////////////////////////////////////////// + + @Override + boolean virtualPut(EntryEventImpl event, + boolean ifNew, + boolean ifOld, + Object expectedOldValue, + boolean requireOldValue, + long lastModified, + boolean overwriteDestroyed) + throws TimeoutException, CacheWriterException { + final long startTime = PartitionedRegionStats.startTime(); + boolean result = false; + final DistributedPutAllOperation putAllOp_save = event.setPutAllOperation(null); + + if (event.getEventId() == null) { + event.setNewEventId(this.cache.getDistributedSystem()); + } + boolean bucketStorageAssigned = true; + try { + final Integer bucketId = event.getKeyInfo().getBucketId(); + assert bucketId != KeyInfo.UNKNOWN_BUCKET; + // check in bucket2Node region + InternalDistributedMember targetNode = getNodeForBucketWrite(bucketId + .intValue(), null); + // force all values to be serialized early to make size computation cheap + // and to optimize distribution. + if (logger.isDebugEnabled()) { + logger.debug("PR.virtualPut putting event={}", event); + } + + if (targetNode == null) { + try { + bucketStorageAssigned=false; + // if this is a Delta update, then throw exception since the key doesn't + // exist if there is no bucket for it yet + // For HDFS region, we will recover key, so allow bucket creation + if (!this.dataPolicy.withHDFS() && event.hasDelta()) { + throw new EntryNotFoundException(LocalizedStrings. + PartitionedRegion_CANNOT_APPLY_A_DELTA_WITHOUT_EXISTING_ENTRY + .toLocalizedString()); + } + targetNode = createBucket(bucketId.intValue(), event.getNewValSizeForPR(), + null); + } + catch (PartitionedRegionStorageException e) { + // try not to throw a PRSE if the cache is closing or this region was + // destroyed during createBucket() (bug 36574) + this.checkReadiness(); + if (this.cache.isClosed()) { + throw new RegionDestroyedException(toString(), getFullPath()); + } + throw e; + } + } + + if (event.isBridgeEvent() && bucketStorageAssigned) { + setNetworkHop(bucketId, targetNode); + } + if (putAllOp_save == null) { + result = putInBucket(targetNode, + bucketId, + event, + ifNew, + ifOld, + expectedOldValue,