Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CA888200C6F for ; Tue, 9 May 2017 19:39:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C9328160BB6; Tue, 9 May 2017 17:39:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BCD0D160BD5 for ; Tue, 9 May 2017 19:39:38 +0200 (CEST) Received: (qmail 66238 invoked by uid 500); 9 May 2017 17:39:37 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 65539 invoked by uid 99); 9 May 2017 17:39:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 May 2017 17:39:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 20637F16BB; Tue, 9 May 2017 17:39:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: klund@apache.org To: commits@geode.apache.org Date: Tue, 09 May 2017 17:39:51 -0000 Message-Id: <31c48624bbf741d1818ce68ffeaf1732@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/49] geode git commit: GEODE-2632: change dependencies on GemFireCacheImpl to InternalCache archived-at: Tue, 09 May 2017 17:39:49 -0000 http://git-wip-us.apache.org/repos/asf/geode/blob/4b489e56/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/BackupManager.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/BackupManager.java b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/BackupManager.java index d2672df..f464e0d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/BackupManager.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/BackupManager.java @@ -14,21 +14,6 @@ */ package org.apache.geode.internal.cache.persistence; -import org.apache.commons.io.FileUtils; -import org.apache.geode.InternalGemFireError; -import org.apache.geode.cache.persistence.PersistentID; -import org.apache.geode.distributed.DistributedSystem; -import org.apache.geode.distributed.internal.DM; -import org.apache.geode.distributed.internal.DistributionConfig; -import org.apache.geode.distributed.internal.MembershipListener; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.ClassPathLoader; -import org.apache.geode.internal.DeployedJar; -import org.apache.geode.internal.JarDeployer; -import org.apache.geode.internal.cache.DiskStoreImpl; -import org.apache.geode.internal.cache.GemFireCacheImpl; -import org.apache.geode.internal.i18n.LocalizedStrings; - import java.io.File; import java.io.FileOutputStream; import java.io.FilenameFilter; @@ -44,10 +29,25 @@ import java.util.concurrent.CountDownLatch; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.io.FileUtils; + +import org.apache.geode.InternalGemFireError; +import org.apache.geode.cache.DiskStore; +import org.apache.geode.cache.persistence.PersistentID; +import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.distributed.internal.DM; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.ClassPathLoader; +import org.apache.geode.internal.DeployedJar; +import org.apache.geode.internal.JarDeployer; +import org.apache.geode.internal.cache.DiskStoreImpl; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.i18n.LocalizedStrings; + /** * This class manages the state an logic to backup a single cache. - * - * */ public class BackupManager implements MembershipListener { @@ -58,11 +58,11 @@ public class BackupManager implements MembershipListener { public static final String USER_FILES = "user"; public static final String CONFIG = "config"; private InternalDistributedMember sender; - private GemFireCacheImpl cache; + private InternalCache cache; private CountDownLatch allowDestroys = new CountDownLatch(1); private volatile boolean isCancelled = false; - public BackupManager(InternalDistributedMember sender, GemFireCacheImpl gemFireCache) { + public BackupManager(InternalDistributedMember sender, InternalCache gemFireCache) { this.sender = sender; this.cache = gemFireCache; } @@ -81,9 +81,9 @@ public class BackupManager implements MembershipListener { private void cleanup() { isCancelled = true; allowDestroys.countDown(); - Collection diskStores = cache.listDiskStoresIncludingRegionOwned(); - for (DiskStoreImpl store : diskStores) { - store.releaseBackupLock(); + Collection diskStores = cache.listDiskStoresIncludingRegionOwned(); + for (DiskStore store : diskStores) { + ((DiskStoreImpl) store).releaseBackupLock(); } final DM distributionManager = cache.getInternalDistributedSystem().getDistributionManager(); distributionManager.removeAllMembershipListener(this); @@ -92,12 +92,13 @@ public class BackupManager implements MembershipListener { public HashSet prepareBackup() { HashSet persistentIds = new HashSet(); - Collection diskStores = cache.listDiskStoresIncludingRegionOwned(); - for (DiskStoreImpl store : diskStores) { - store.lockStoreBeforeBackup(); - if (store.hasPersistedData()) { - persistentIds.add(store.getPersistentID()); - store.getStats().startBackup(); + Collection diskStores = cache.listDiskStoresIncludingRegionOwned(); + for (DiskStore store : diskStores) { + DiskStoreImpl storeImpl = (DiskStoreImpl) store; + storeImpl.lockStoreBeforeBackup(); + if (storeImpl.hasPersistedData()) { + persistentIds.add(storeImpl.getPersistentID()); + storeImpl.getStats().startBackup(); } } return persistentIds; @@ -116,9 +117,10 @@ public class BackupManager implements MembershipListener { /* * Find the first matching DiskStoreId directory for this member. */ - for (DiskStoreImpl diskStore : cache.listDiskStoresIncludingRegionOwned()) { + for (DiskStore diskStore : cache.listDiskStoresIncludingRegionOwned()) { File[] matchingFiles = baselineParentDir.listFiles(new FilenameFilter() { - Pattern pattern = Pattern.compile(".*" + diskStore.getBackupDirName() + "$"); + Pattern pattern = + Pattern.compile(".*" + ((DiskStoreImpl) diskStore).getBackupDirName() + "$"); public boolean accept(File dir, String name) { Matcher m = pattern.matcher(name); @@ -142,7 +144,6 @@ public class BackupManager implements MembershipListener { * option. May be null if the user specified a full backup. * @return null if the backup is to be a full backup otherwise return the data store directory in * the previous backup for this member (if incremental). - * @throws IOException */ private File checkBaseline(File baselineParentDir) throws IOException { File baselineDir = null; @@ -188,12 +189,12 @@ public class BackupManager implements MembershipListener { File storesDir = new File(backupDir, DATA_STORES); RestoreScript restoreScript = new RestoreScript(); HashSet persistentIds = new HashSet(); - Collection diskStores = - new ArrayList(cache.listDiskStoresIncludingRegionOwned()); + Collection diskStores = + new ArrayList(cache.listDiskStoresIncludingRegionOwned()); boolean foundPersistentData = false; - for (Iterator itr = diskStores.iterator(); itr.hasNext();) { - DiskStoreImpl store = itr.next(); + for (Iterator itr = diskStores.iterator(); itr.hasNext();) { + DiskStoreImpl store = (DiskStoreImpl) itr.next(); if (store.hasPersistedData()) { if (!foundPersistentData) { createBackupDir(backupDir); @@ -210,10 +211,11 @@ public class BackupManager implements MembershipListener { allowDestroys.countDown(); - for (DiskStoreImpl store : diskStores) { - store.finishBackup(this); - store.getStats().endBackup(); - persistentIds.add(store.getPersistentID()); + for (DiskStore store : diskStores) { + DiskStoreImpl storeImpl = (DiskStoreImpl) store; + storeImpl.finishBackup(this); + storeImpl.getStats().endBackup(); + persistentIds.add(storeImpl.getPersistentID()); } if (foundPersistentData) { @@ -227,7 +229,6 @@ public class BackupManager implements MembershipListener { } } - return persistentIds; } finally { @@ -256,7 +257,7 @@ public class BackupManager implements MembershipListener { FileUtils.copyFile(new File(DistributedSystem.getPropertiesFile()), propertyBackup); } - // TODO sbawaska: should the gfsecurity.properties file be backed up? + // TODO: should the gfsecurity.properties file be backed up? } private void backupUserFiles(RestoreScript restoreScript, File backupDir) throws IOException { @@ -330,10 +331,7 @@ public class BackupManager implements MembershipListener { cache.getInternalDistributedSystem().getDistributedMember(); String vmId = memberId.toString(); vmId = cleanSpecialCharacters(vmId); - File backupDir = new File(targetDir, vmId); - - - return backupDir; + return new File(targetDir, vmId); } private void createBackupDir(File backupDir) throws IOException { http://git-wip-us.apache.org/repos/asf/geode/blob/4b489e56/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java index 5a3c002..c9aeaed 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java @@ -26,20 +26,19 @@ import org.apache.geode.cache.snapshot.CacheSnapshotService; import org.apache.geode.cache.snapshot.RegionSnapshotService; import org.apache.geode.cache.snapshot.SnapshotOptions; import org.apache.geode.cache.snapshot.SnapshotOptions.SnapshotFormat; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.snapshot.GFSnapshot.GFSnapshotImporter; import org.apache.geode.internal.i18n.LocalizedStrings; /** * Provides an implementation for cache snapshots. Most of the implementation delegates to * {@link RegionSnapshotService}. - * */ public class CacheSnapshotServiceImpl implements CacheSnapshotService { /** the cache */ - private final GemFireCacheImpl cache; + private final InternalCache cache; - public CacheSnapshotServiceImpl(GemFireCacheImpl cache) { + public CacheSnapshotServiceImpl(InternalCache cache) { this.cache = cache; } http://git-wip-us.apache.org/repos/asf/geode/blob/4b489e56/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/GFSnapshot.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/GFSnapshot.java b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/GFSnapshot.java index 34ddd63..5ba8800 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/GFSnapshot.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/GFSnapshot.java @@ -24,28 +24,24 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.channels.FileChannel; import java.util.Arrays; -import java.util.Collection; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; -import java.util.TreeMap; import org.apache.geode.DataSerializer; -import org.apache.geode.InternalGemFireError; import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.snapshot.SnapshotIterator; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.snapshot.SnapshotPacket.SnapshotRecord; import org.apache.geode.internal.i18n.LocalizedStrings; -import org.apache.geode.pdx.PdxSerializationException; import org.apache.geode.pdx.internal.EnumInfo; import org.apache.geode.pdx.internal.PdxType; import org.apache.geode.pdx.internal.TypeRegistry; /** * Provides support for reading and writing snapshot files. - * */ public class GFSnapshot { /** @@ -260,7 +256,7 @@ public class GFSnapshot { // write pdx types try { - GemFireCacheImpl cache = GemFireCacheImpl + InternalCache cache = GemFireCacheImpl .getForPdx("PDX registry is unavailable because the Cache has been closed."); new ExportedRegistry(cache.getPdxRegistry()).toData(dos); } catch (CacheClosedException e) { @@ -400,7 +396,7 @@ public class GFSnapshot { } private TypeRegistry getRegistry() { - GemFireCacheImpl gfc = GemFireCacheImpl.getInstance(); + InternalCache gfc = GemFireCacheImpl.getInstance(); if (gfc != null) { return gfc.getPdxRegistry(); } http://git-wip-us.apache.org/repos/asf/geode/blob/4b489e56/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CachedRegionHelper.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CachedRegionHelper.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CachedRegionHelper.java index 0a182e6..a82a804 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CachedRegionHelper.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CachedRegionHelper.java @@ -16,7 +16,6 @@ package org.apache.geode.internal.cache.tier; import org.apache.geode.CancelException; import org.apache.geode.SystemFailure; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; import org.apache.geode.internal.cache.InternalCache; http://git-wip-us.apache.org/repos/asf/geode/blob/4b489e56/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java index aeabc86..7b291da 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java @@ -28,6 +28,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.net.SocketCreator; import org.apache.logging.log4j.Logger; @@ -41,7 +42,6 @@ import org.apache.geode.distributed.DistributedSystemDisconnectedException; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ServerLocation; import org.apache.geode.internal.cache.CacheServerImpl; -import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl; import org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor; import org.apache.geode.internal.i18n.LocalizedStrings; @@ -291,7 +291,7 @@ public final class InternalClientMembership { public static Map getClientQueueSizes() { Map clientQueueSizes = new HashMap(); - GemFireCacheImpl c = (GemFireCacheImpl) CacheFactory.getAnyInstance(); + InternalCache c = (InternalCache) CacheFactory.getAnyInstance(); if (c == null) // Add a NULL Check return clientQueueSizes; http://git-wip-us.apache.org/repos/asf/geode/blob/4b489e56/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java index ed29472..9114367 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java @@ -67,12 +67,15 @@ import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.client.internal.PoolImpl; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.cache.wan.GatewayTransportFilter; -import org.apache.geode.distributed.internal.*; -import org.apache.geode.internal.net.SocketCreator; +import org.apache.geode.distributed.internal.DM; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.LonerDistributionManager; +import org.apache.geode.distributed.internal.PooledExecutorWithDMStats; +import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.internal.SystemTimer; import org.apache.geode.internal.cache.BucketAdvisor; import org.apache.geode.internal.cache.BucketAdvisor.BucketProfile; -import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.partitioned.AllBucketProfilesUpdateMessage; @@ -83,10 +86,11 @@ import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.LoggingThreadGroup; import org.apache.geode.internal.logging.log4j.LocalizedMessage; +import org.apache.geode.internal.net.SocketCreator; +import org.apache.geode.internal.net.SocketCreatorFactory; import org.apache.geode.internal.security.IntegratedSecurityService; import org.apache.geode.internal.security.SecurableCommunicationChannel; import org.apache.geode.internal.security.SecurityService; -import org.apache.geode.internal.net.SocketCreatorFactory; import org.apache.geode.internal.tcp.ConnectionTable; import org.apache.geode.internal.util.ArrayUtils; @@ -375,9 +379,9 @@ public class AcceptorImpl extends Acceptor implements Runnable { .getSocketCreatorForComponent(SecurableCommunicationChannel.GATEWAY); } - final GemFireCacheImpl gc; + final InternalCache gc; if (getCachedRegionHelper() != null) { - gc = (GemFireCacheImpl) getCachedRegionHelper().getCache(); + gc = getCachedRegionHelper().getCache(); } else { gc = null; } @@ -655,8 +659,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { + this.localPort + " local port: " + this.serverSock.getLocalPort()); this.selectorThread.start(); } - GemFireCacheImpl myCache = (GemFireCacheImpl) cache; - Set prs = myCache.getPartitionedRegions(); + Set prs = this.cache.getPartitionedRegions(); for (PartitionedRegion pr : prs) { Map profiles = new HashMap(); @@ -957,7 +960,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { { SystemFailure.checkFailure(); // this.cache.getDistributedSystem().getCancelCriterion().checkCancelInProgress(null); - if (((GemFireCacheImpl) this.cache).isClosed()) { // bug 38834 + if (this.cache.isClosed()) { // bug 38834 break; // TODO should just ask cache's CancelCriterion } if (this.cache.getCancelCriterion().isCancelInProgress()) { @@ -1559,9 +1562,8 @@ public class AcceptorImpl extends Acceptor implements Runnable { } private void notifyCacheMembersOfClose() { - GemFireCacheImpl myCache = (GemFireCacheImpl) cache; - if (!myCache.forcedDisconnect()) { - for (PartitionedRegion pr : myCache.getPartitionedRegions()) { + if (!this.cache.forcedDisconnect()) { + for (PartitionedRegion pr : this.cache.getPartitionedRegions()) { Map profiles = new HashMap<>(); // get all local real bucket advisors Map advisors = pr.getRegionAdvisor().getAllBucketAdvisors(); http://git-wip-us.apache.org/repos/asf/geode/blob/4b489e56/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java index d217672..58ba4b3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java @@ -12,13 +12,39 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -/** - * - */ package org.apache.geode.internal.cache.tier.sockets; -import org.apache.geode.*; -import org.apache.geode.cache.*; +import java.io.EOFException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Semaphore; +import java.util.regex.Pattern; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.CopyException; +import org.apache.geode.InternalGemFireError; +import org.apache.geode.SerializationException; +import org.apache.geode.SystemFailure; +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheLoaderException; +import org.apache.geode.cache.CacheWriterException; +import org.apache.geode.cache.InterestResultPolicy; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionDestroyedException; +import org.apache.geode.cache.TransactionException; import org.apache.geode.cache.persistence.PartitionOfflineException; import org.apache.geode.cache.query.types.CollectionType; import org.apache.geode.distributed.DistributedSystemDisconnectedException; @@ -27,8 +53,21 @@ import org.apache.geode.distributed.internal.DistributionStats; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.Assert; import org.apache.geode.internal.Version; -import org.apache.geode.internal.cache.*; +import org.apache.geode.internal.cache.CachedDeserializable; +import org.apache.geode.internal.cache.DistributedRegion; +import org.apache.geode.internal.cache.EntryEventImpl; +import org.apache.geode.internal.cache.EntrySnapshot; +import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.FindVersionTagOperation; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.LocalRegion.NonTXEntry; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionHelper; +import org.apache.geode.internal.cache.TXManagerImpl; +import org.apache.geode.internal.cache.TXStateProxy; +import org.apache.geode.internal.cache.Token; +import org.apache.geode.internal.cache.VersionTagHolder; import org.apache.geode.internal.cache.tier.CachedRegionHelper; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.InterestType; @@ -44,13 +83,6 @@ import org.apache.geode.internal.security.SecurityService; import org.apache.geode.internal.sequencelog.EntryLogger; import org.apache.geode.security.GemFireSecurityException; -import org.apache.logging.log4j.Logger; - -import java.io.*; -import java.util.*; -import java.util.concurrent.Semaphore; -import java.util.regex.Pattern; - public abstract class BaseCommand implements Command { protected static final Logger logger = LogService.getLogger(); @@ -125,7 +157,7 @@ public abstract class BaseCommand implements Command { boolean shouldMasquerade = shouldMasqueradeForTx(msg, servConn); try { if (shouldMasquerade) { - GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache(); + InternalCache cache = servConn.getCache(); InternalDistributedMember member = (InternalDistributedMember) servConn.getProxyID().getDistributedMember(); TXManagerImpl txMgr = cache.getTxManager(); http://git-wip-us.apache.org/repos/asf/geode/blob/4b489e56/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java index 28d6ae2..e79bfbd 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java @@ -12,7 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache.tier.sockets; import static org.apache.geode.distributed.ConfigurationProperties.*; @@ -51,7 +50,6 @@ import org.apache.geode.DataSerializer; import org.apache.geode.Instantiator; import org.apache.geode.InternalGemFireError; import org.apache.geode.StatisticsFactory; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheEvent; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.InterestRegistrationEvent; @@ -79,6 +77,7 @@ import org.apache.geode.distributed.internal.MessageWithReply; import org.apache.geode.distributed.internal.ReplyMessage; import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.internal.ClassLoadUtil; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.statistics.DummyStatisticsFactory; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.InternalInstantiator; @@ -116,7 +115,6 @@ import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.InternalLogWriter; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LocalizedMessage; -import org.apache.geode.internal.net.SocketCloser; import org.apache.geode.security.AccessControl; import org.apache.geode.security.AuthenticationFailedException; import org.apache.geode.security.AuthenticationRequiredException; @@ -125,7 +123,6 @@ import org.apache.geode.security.AuthenticationRequiredException; * Class CacheClientNotifier works on the server and manages client socket connections * to clients requesting notification of updates and notifies them when updates occur. * - * * @since GemFire 3.2 */ @SuppressWarnings({"synthetic-access", "deprecation"}) @@ -137,15 +134,10 @@ public class CacheClientNotifier { /** * Factory method to construct a CacheClientNotifier CacheClientNotifier instance. * - * @param cache The GemFire Cache - * @param acceptorStats - * @param maximumMessageCount - * @param messageTimeToLive - * @param listener - * @param overflowAttributesList + * @param cache The GemFire InternalCache * @return A CacheClientNotifier instance */ - public static synchronized CacheClientNotifier getInstance(Cache cache, + public static synchronized CacheClientNotifier getInstance(InternalCache cache, CacheServerStats acceptorStats, int maximumMessageCount, int messageTimeToLive, ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver) { if (ccnSingleton == null) { @@ -158,13 +150,6 @@ public class CacheClientNotifier { // In this case, the HaContainer should be lazily created here ccnSingleton.initHaContainer(overflowAttributesList); } - // else { - // ccnSingleton.acceptorStats = acceptorStats; - // ccnSingleton.maximumMessageCount = maximumMessageCount; - // ccnSingleton.messageTimeToLive = messageTimeToLive; - // ccnSingleton._connectionListener = listener; - // ccnSingleton.setCache((GemFireCache)cache); - // } return ccnSingleton; } @@ -178,8 +163,6 @@ public class CacheClientNotifier { * @param dos the DataOutputStream to use for writing the message * @param type a byte representing the message type * @param p_msg the message to be written; can be null - * @param clientVersion - * */ private void writeMessage(DataOutputStream dos, byte type, String p_msg, Version clientVersion) throws IOException { @@ -248,35 +231,12 @@ public class CacheClientNotifier { * @param dos the DataOutputStream to use for writing the message * @param type a byte representing the exception type * @param ex the exception to be written; should not be null - * @param clientVersion - * */ private void writeException(DataOutputStream dos, byte type, Exception ex, Version clientVersion) throws IOException { - writeMessage(dos, type, ex.toString(), clientVersion); } - // /** - // * Factory method to return the singleton CacheClientNotifier - // * instance. - // * @return the singleton CacheClientNotifier instance - // */ - // public static CacheClientNotifier getInstance() - // { - // return _instance; - // } - - // /** - // * Shuts down the singleton CacheClientNotifier instance. - // */ - // public static void shutdownInstance() - // { - // if (_instance == null) return; - // _instance.shutdown(); - // _instance = null; - // } - /** * Registers a new client updater that wants to receive updates with this server. * @@ -355,12 +315,10 @@ public class CacheClientNotifier { DistributedSystem system = this.getCache().getDistributedSystem(); Properties sysProps = system.getProperties(); String authenticator = sysProps.getProperty(SECURITY_CLIENT_AUTHENTICATOR); - // TODO;hitesh for conflation + if (clientVersion.compareTo(Version.GFE_603) >= 0) { byte[] overrides = HandShake.extractOverrides(new byte[] {(byte) dis.read()}); - clientConflation = overrides[0]; - } else { clientConflation = (byte) dis.read(); } @@ -379,7 +337,6 @@ public class CacheClientNotifier { proxy = registerClient(socket, proxyID, proxy, isPrimary, clientConflation, clientVersion, acceptorId, notifyBySubscription); - // TODO:hitesh Properties credentials = HandShake.readCredentials(dis, dos, system); if (credentials != null && proxy != null) { if (securityLogWriter.fineEnabled()) { @@ -445,11 +402,9 @@ public class CacheClientNotifier { return; } - this._statistics.endClientRegistration(startTime); } - /** * Registers a new client that wants to receive updates with this server. * @@ -678,7 +633,6 @@ public class CacheClientNotifier { /** * Makes Primary to this CacheClientProxy and start the dispatcher of the CacheClientProxy * - * @param proxyId * @param isClientReady Whether the marker has already been processed. This value helps determine * whether to start the dispatcher. */ @@ -695,9 +649,6 @@ public class CacheClientNotifier { * Then, start or resume the dispatcher. Otherwise, let the clientReady message start the * dispatcher. See CacheClientProxy.startOrResumeMessageDispatcher if * (!proxy._messageDispatcher.isAlive()) { - * - * proxy._messageDispatcher._messageQueue.setPrimary(true); proxy._messageDispatcher.start(); - * } */ if (isClientReady || !proxy.isDurable()) { if (logger.isDebugEnabled()) { @@ -713,8 +664,6 @@ public class CacheClientNotifier { /** * Adds or updates entry in the dispatched message map when client sends an ack. * - * @param proxyId - * @param eid * @return success */ public boolean processDispatchedMessage(ClientProxyMembershipID proxyId, EventID eid) { @@ -754,8 +703,6 @@ public class CacheClientNotifier { * Unregisters an existing client from this server. * * @param memberId Uniquely identifies the client - * - * */ public void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) { if (logger.isDebugEnabled()) { @@ -781,13 +728,11 @@ public class CacheClientNotifier { /** * The client represented by the proxyId is ready to receive updates. - * - * @param proxyId */ public void readyForEvents(ClientProxyMembershipID proxyId) { CacheClientProxy proxy = getClientProxy(proxyId); if (proxy == null) { - // @todo log a message + // TODO: log a message } else { // False signifies that a marker message has not already been processed. // Generate and send one. @@ -817,7 +762,6 @@ public class CacheClientNotifier { CacheClientNotifier instance = ccnSingleton; if (instance != null) { instance.singletonNotifyClients(event, null); - } } @@ -829,7 +773,6 @@ public class CacheClientNotifier { CacheClientNotifier instance = ccnSingleton; if (instance != null) { instance.singletonNotifyClients(event, cmsg); - } } @@ -839,10 +782,6 @@ public class CacheClientNotifier { FilterInfo filterInfo = event.getLocalFilterInfo(); - // if (_logger.fineEnabled()) { - // _logger.fine("Client dispatcher processing event " + event); - // } - FilterProfile regionProfile = ((LocalRegion) event.getRegion()).getFilterProfile(); if (filterInfo != null) { // if the routing was made using an old profile we need to recompute it @@ -905,7 +844,7 @@ public class CacheClientNotifier { } if (!ids.isEmpty()) { if (isTraceEnabled) { - logger.trace("adding invalidation routing to message for {}" + ids); + logger.trace("adding invalidation routing to message for {}", ids); } clientMessage.addClientInterestList(ids, false); filterClients.addAll(ids); @@ -964,10 +903,8 @@ public class CacheClientNotifier { if (filterInfo.filterProcessedLocally) { removeDestroyTokensFromCqResultKeys(event, filterInfo); } - } - private void removeDestroyTokensFromCqResultKeys(InternalCacheEvent event, FilterInfo filterInfo) { FilterProfile regionProfile = ((LocalRegion) event.getRegion()).getFilterProfile(); @@ -986,12 +923,9 @@ public class CacheClientNotifier { } } - /** * delivers the given message to all proxies for routing. The message should already have client * interest established, or override the isClientInterested method to implement its own routing - * - * @param clientMessage */ public static void routeClientMessage(Conflatable clientMessage) { CacheClientNotifier instance = ccnSingleton; @@ -1014,7 +948,7 @@ public class CacheClientNotifier { } } - /* + /** * this is for server side registration of client queue */ public static void routeSingleClientMessage(ClientUpdateMessage clientMessage, @@ -1054,7 +988,6 @@ public class CacheClientNotifier { if (deadProxies != null) { closeDeadProxies(deadProxies, false); } - } /** @@ -1146,7 +1079,6 @@ public class CacheClientNotifier { * @param operation The operation that occurred (e.g. AFTER_CREATE) * @param event The event containing the data to be updated * @return a ClientUpdateMessage - * @throws Exception */ private ClientUpdateMessageImpl initializeMessage(EnumListenerEvent operation, CacheEvent event) throws Exception { @@ -1219,87 +1151,6 @@ public class CacheClientNotifier { || operation == EnumListenerEvent.AFTER_REGION_INVALIDATE; } - // /** - // * Queues the ClientUpdateMessage to be distributed - // * to interested clients. This method is not being used currently. - // * @param clientMessage The ClientUpdateMessage to be queued - // */ - // protected void notifyClients(final ClientUpdateMessage clientMessage) - // { - // if (USE_SYNCHRONOUS_NOTIFICATION) - // { - // // Execute the method in the same thread as the caller - // deliver(clientMessage); - // } - // else { - // // Obtain an Executor and use it to execute the method in its own thread - // try - // { - // getExecutor().execute(new Runnable() - // { - // public void run() - // { - // deliver(clientMessage); - // } - // } - // ); - // } catch (InterruptedException e) - // { - // _logger.warning("CacheClientNotifier: notifyClients interrupted", e); - // Thread.currentThread().interrupt(); - // } - // } - // } - - // /** - // * Updates the information this CacheClientNotifier maintains - // * for a given edge client. It is invoked when a edge client re-connects to - // * the server. - // * - // * @param clientHost - // * The host on which the client runs (i.e. the host the - // * CacheClientNotifier uses to communicate with the - // * CacheClientUpdater) This is used with the clientPort to uniquely - // * identify the client - // * @param clientPort - // * The port through which the server communicates with the client - // * (i.e. the port the CacheClientNotifier uses to communicate with - // * the CacheClientUpdater) This is used with the clientHost to - // * uniquely identify the client - // * @param remotePort - // * The port through which the client communicates with the server - // * (i.e. the new port the ConnectionImpl uses to communicate with the - // * ServerConnection) - // * @param membershipID - // * Uniquely idenifies the client - // */ - // public void registerClientPort(String clientHost, int clientPort, - // int remotePort, ClientProxyMembershipID membershipID) - // { - // if (_logger.fineEnabled()) - // _logger.fine("CacheClientNotifier: Registering client port: " - // + clientHost + ":" + clientPort + " with remote port " + remotePort - // + " and ID " + membershipID); - // for (Iterator i = getClientProxies().iterator(); i.hasNext();) { - // CacheClientProxy proxy = (CacheClientProxy)i.next(); - // if (_logger.finerEnabled()) - // _logger.finer("CacheClientNotifier: Potential client: " + proxy); - // //if (proxy.representsCacheClientUpdater(clientHost, clientPort)) - // if (proxy.isMember(membershipID)) { - // if (_logger.finerEnabled()) - // _logger - // .finer("CacheClientNotifier: Updating remotePorts since host and port are a match"); - // proxy.addPort(remotePort); - // } - // else { - // if (_logger.finerEnabled()) - // _logger.finer("CacheClientNotifier: Host and port " - // + proxy.getRemoteHostAddress() + ":" + proxy.getRemotePort() - // + " do not match " + clientHost + ":" + clientPort); - // } - // } - // } - /** * Registers client interest in the input region and key. * @@ -1350,23 +1201,9 @@ public class CacheClientNotifier { } } - /* - * protected void addFilterRegisteredClients(String regionName, ClientProxyMembershipID - * membershipID) throws RegionNotFoundException { // Update Regions book keeping. LocalRegion - * region = (LocalRegion)this._cache.getRegion(regionName); if (region == null) { //throw new - * AssertionError("Could not find region named '" + regionName + "'"); // @todo: see bug 36805 // - * fix for bug 37979 if (_logger.fineEnabled()) { _logger .fine("CacheClientNotifier: Client " + - * membershipID + " :Throwing RegionDestroyedException as region: " + regionName + - * " is not present."); } throw new RegionDestroyedException("registerInterest failed", - * regionName); } else { region.getFilterProfile().addFilterRegisteredClients(this, membershipID); - * } } - */ - /** * Store region and delta relation * - * @param regionsWithEmptyDataPolicy - * @param regionName * @param regionDataPolicy (0==empty) * @since GemFire 6.1 */ @@ -1457,13 +1294,11 @@ public class CacheClientNotifier { } } - /** * If the conflatable is an instance of HAEventWrapper, and if the corresponding entry is present * in the haContainer, set the reference to the clientUpdateMessage to null and putInProgress flag * to false. Also, if the ref count is zero, then remove the entry from the haContainer. * - * @param conflatable * @since GemFire 5.7 */ private void checkAndRemoveFromClientMsgsRegion(Conflatable conflatable) { @@ -1484,9 +1319,6 @@ public class CacheClientNotifier { } } } - // else { - // This is a replay-of-event case. - // } } else { // This wrapper resides in haContainer. wrapper.setClientUpdateMessage(null); @@ -1525,7 +1357,6 @@ public class CacheClientNotifier { return proxy; } - /** * Returns the CacheClientProxy associated to the durableClientId * @@ -1595,10 +1426,6 @@ public class CacheClientNotifier { membershipID); logger.debug("{}::getClientProxySameDS(), Number of proxies in the Cache Clinet Notifier: {}", this, getClientProxies().size()); - /* - * _logger.fine(this + "::getClientProxySameDS(), Proxies in the Cache Clinet Notifier: " + - * getClientProxies()); - */ } CacheClientProxy proxy = null; for (Iterator i = getClientProxies().iterator(); i.hasNext();) { @@ -1618,7 +1445,6 @@ public class CacheClientNotifier { return proxy; } - /** * It will remove the clients connected to the passed acceptorId. If its the only server, shuts * down this instance. @@ -1704,7 +1530,6 @@ public class CacheClientNotifier { } } this.timedOutDurableClientProxies.remove(proxy.getProxyID()); - } protected void addClientInitProxy(CacheClientProxy proxy) throws IOException { @@ -1719,7 +1544,6 @@ public class CacheClientNotifier { return this._initClientProxies.containsKey(proxy.getProxyID()); } - /** * Returns (possibly stale) set of memberIds for all clients being actively notified by this * server. @@ -1781,7 +1605,6 @@ public class CacheClientNotifier { * @since GemFire 5.6 */ public boolean hasPrimaryForDurableClient(String durableId) { - for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) { CacheClientProxy proxy = (CacheClientProxy) iter.next(); ClientProxyMembershipID proxyID = proxy.getProxyID(); @@ -1828,7 +1651,6 @@ public class CacheClientNotifier { return false; } - /** * Removes an existing CacheClientProxy from the list of known client proxies * @@ -1840,14 +1662,13 @@ public class CacheClientNotifier { ClientProxyMembershipID client = proxy.getProxyID(); this._clientProxies.remove(client); this._connectionListener.queueRemoved(); - ((GemFireCacheImpl) this.getCache()).cleanupForClient(this, client); + this.getCache().cleanupForClient(this, client); if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) { ClientHealthMonitor chm = ClientHealthMonitor.getInstance(); if (chm != null) { chm.numOfClientsPerVersion.decrementAndGet(proxy.getVersion().ordinal()); } } - } void durableClientTimedOut(ClientProxyMembershipID client) { @@ -1868,17 +1689,6 @@ public class CacheClientNotifier { return Collections.unmodifiableCollection(this._clientProxies.values()); } - // /** - // * Returns the Executor that delivers messages to the - // * CacheClientProxy instances. - // * @return the Executor that delivers messages to the - // * CacheClientProxy instances - // */ - // protected Executor getExecutor() - // { - // return _executor; - // } - private void closeAllClientCqs(CacheClientProxy proxy) { CqService cqService = proxy.getCache().getCqService(); if (cqService != null) { @@ -1901,7 +1711,6 @@ public class CacheClientNotifier { /** * Shuts down durable client proxy - * */ public boolean closeDurableClientProxy(String durableClientId) throws CacheException { CacheClientProxy ccp = getClientProxy(durableClientId); @@ -1917,6 +1726,7 @@ public class CacheClientNotifier { if (logger.isDebugEnabled()) { logger.debug("Cannot close running durable client: {}", durableClientId); } + // TODO: never throw an anonymous inner class throw new CacheException("Cannot close a running durable client : " + durableClientId) {}; } } @@ -1960,7 +1770,6 @@ public class CacheClientNotifier { } // for } - /** * Registers a new InterestRegistrationListener with the set of * InterestRegistrationListeners. @@ -2031,13 +1840,13 @@ public class CacheClientNotifier { } /** - * Returns this CacheClientNotifier's Cache. + * Returns this CacheClientNotifier's InternalCache. * - * @return this CacheClientNotifier's Cache + * @return this CacheClientNotifier's InternalCache */ - protected Cache getCache() { // TODO:SYNC: looks wrong + protected InternalCache getCache() { // TODO:SYNC: looks wrong if (this._cache != null && this._cache.isClosed()) { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); if (cache != null) { this._cache = cache; this.logWriter = cache.getInternalLogWriter(); @@ -2068,25 +1877,18 @@ public class CacheClientNotifier { protected void handleInterestEvent(InterestRegistrationEvent event) { LocalRegion region = (LocalRegion) event.getRegion(); region.handleInterestEvent(event); - } /** - * Constructor. - * - * @param cache The GemFire Cache - * @param acceptorStats - * @param maximumMessageCount - * @param messageTimeToLive + * @param cache The GemFire InternalCache * @param listener a listener which should receive notifications abouts queues being added or * removed. - * @param overflowAttributesList */ - private CacheClientNotifier(Cache cache, CacheServerStats acceptorStats, int maximumMessageCount, - int messageTimeToLive, ConnectionListener listener, List overflowAttributesList, - boolean isGatewayReceiver) { + private CacheClientNotifier(InternalCache cache, CacheServerStats acceptorStats, + int maximumMessageCount, int messageTimeToLive, ConnectionListener listener, + List overflowAttributesList, boolean isGatewayReceiver) { // Set the Cache - this.setCache((GemFireCacheImpl) cache); + setCache(cache); this.acceptorStats = acceptorStats; this.socketCloser = new SocketCloser(1, 50); // we only need one thread per client and wait 50ms // for close @@ -2111,9 +1913,6 @@ public class CacheClientNotifier { } this._statistics = new CacheClientNotifierStats(factory); - // Initialize the executors - // initializeExecutors(this._logger); - try { this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY)); if (this.logFrequency <= 0) { @@ -2167,13 +1966,6 @@ public class CacheClientNotifier { } } - - /* - * (non-Javadoc) - * - * @see org.apache.geode.distributed.internal.DistributionMessage#process(org.apache.geode. - * distributed.internal.DistributionManager) - */ @Override protected void process(DistributionManager dm) { // Get the proxy for the proxy id @@ -2199,11 +1991,6 @@ public class CacheClientNotifier { } } - /* - * (non-Javadoc) - * - * @see org.apache.geode.internal.DataSerializableFixedID#getDSFID() - */ public int getDSFID() { return SERVER_INTEREST_REGISTRATION_MESSAGE; } @@ -2225,107 +2012,8 @@ public class CacheClientNotifier { this.clientMessage = new ClientInterestMessageImpl(); InternalDataSerializer.invokeFromData(this.clientMessage, in); } - } - - // * Initializes the QueuedExecutor and - // PooledExecutor - // * used to deliver messages to CacheClientProxy instances. - // * @param logger The GemFire LogWriterI18n - // */ - // private void initializeExecutors(LogWriterI18n logger) - // { - // // Create the thread groups - // final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup("Cache - // Client Notifier Logger Group", logger); - // final ThreadGroup notifierGroup = - // new ThreadGroup("Cache Client Notifier Group") - // { - // public void uncaughtException(Thread t, Throwable e) - // { - // Thread.dumpStack(); - // loggerGroup.uncaughtException(t, e); - // //CacheClientNotifier.exceptionInThreads = true; - // } - // }; - // - // // Originally set ThreadGroup to be a daemon, but it was causing the - // following - // // exception after five minutes of non-activity (the keep alive time of the - // // threads in the PooledExecutor. - // - // // java.lang.IllegalThreadStateException - // // at java.lang.ThreadGroup.add(Unknown Source) - // // at java.lang.Thread.init(Unknown Source) - // // at java.lang.Thread.(Unknown Source) - // // at - // org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier$4.newThread(CacheClientNotifier.java:321) - // // at - // org.apache.edu.oswego.cs.dl.util.concurrent.PooledExecutor.addThread(PooledExecutor.java:512) - // // at - // org.apache.edu.oswego.cs.dl.util.concurrent.PooledExecutor.execute(PooledExecutor.java:888) - // // at - // org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.notifyClients(CacheClientNotifier.java:95) - // // at - // org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:271) - // - // //notifierGroup.setDaemon(true); - // - // if (USE_QUEUED_EXECUTOR) - // createQueuedExecutor(notifierGroup); - // else - // createPooledExecutor(notifierGroup); - // } - - // /** - // * Creates the QueuedExecutor used to deliver messages - // * to CacheClientProxy instances - // * @param notifierGroup The ThreadGroup to which the - // * QueuedExecutor's Threads belong - // */ - // protected void createQueuedExecutor(final ThreadGroup notifierGroup) - // { - // QueuedExecutor queuedExecutor = new QueuedExecutor(new LinkedQueue()); - // queuedExecutor.setThreadFactory(new ThreadFactory() - // { - // public Thread newThread(Runnable command) - // { - // Thread thread = new Thread(notifierGroup, command, "Queued Cache Client - // Notifier"); - // thread.setDaemon(true); - // return thread; - // } - // }); - // _executor = queuedExecutor; - // } - - // /** - // * Creates the PooledExecutor used to deliver messages - // * to CacheClientProxy instances - // * @param notifierGroup The ThreadGroup to which the - // * PooledExecutor's Threads belong - // */ - // protected void createPooledExecutor(final ThreadGroup notifierGroup) - // { - // PooledExecutor pooledExecutor = new PooledExecutor(new - // BoundedLinkedQueue(4096), 50); - // pooledExecutor.setMinimumPoolSize(10); - // pooledExecutor.setKeepAliveTime(1000 * 60 * 5); - // pooledExecutor.setThreadFactory(new ThreadFactory() - // { - // public Thread newThread(Runnable command) - // { - // Thread thread = new Thread(notifierGroup, command, "Pooled Cache Client - // Notifier"); - // thread.setDaemon(true); - // return thread; - // } - // }); - // pooledExecutor.createThreads(5); - // _executor = pooledExecutor; - // } - protected void deliverInterestChange(ClientProxyMembershipID proxyID, ClientInterestMessageImpl message) { DM dm = ((InternalDistributedSystem) this.getCache().getDistributedSystem()) @@ -2471,23 +2159,6 @@ public class CacheClientNotifier { */ protected static final int ALL_PORTS = -1; - // /** - // * Whether to synchonously deliver messages to proxies. - // * This is currently hard-coded to true to ensure ordering. - // */ - // protected static final boolean USE_SYNCHRONOUS_NOTIFICATION = - // true; - // Boolean.getBoolean("CacheClientNotifier.USE_SYNCHRONOUS_NOTIFICATION"); - - // /** - // * Whether to use the QueuedExecutor (or the - // * PooledExecutor) to deliver messages to proxies. - // * Currently, delivery is synchronous. No Executor is - // * used. - // */ - // protected static final boolean USE_QUEUED_EXECUTOR = - // Boolean.getBoolean("CacheClientNotifier.USE_QUEUED_EXECUTOR"); - /** * The map of known CacheClientProxy instances. Maps ClientProxyMembershipID to * CacheClientProxy. Note that the keys in this map are not updated when a durable client @@ -2508,11 +2179,11 @@ public class CacheClientNotifier { new HashSet(); /** - * The GemFire Cache. Note that since this is a singleton class you should not use a - * direct reference to _cache in CacheClientNotifier code. Instead, you should always use - * getCache() + * The GemFire InternalCache. Note that since this is a singleton class you should + * not use a direct reference to _cache in CacheClientNotifier code. Instead, you should always + * use getCache() */ - private GemFireCacheImpl _cache; + private InternalCache _cache; private InternalLogWriter logWriter; @@ -2543,10 +2214,6 @@ public class CacheClientNotifier { */ private volatile HAContainerWrapper haContainer; - // /** - // * The singleton CacheClientNotifier instance - // */ - // protected static CacheClientNotifier _instance; /** * The size of the server-to-client communication socket buffers. This can be modified using the * BridgeServer.SOCKET_BUFFER_SIZE system property. @@ -2631,9 +2298,8 @@ public class CacheClientNotifier { // lazily initialize haContainer in case this CCN instance was created by a gateway receiver if (overflowAttributesList != null && !HARegionQueue.HA_EVICTION_POLICY_NONE.equals(overflowAttributesList.get(0))) { - haContainer = new HAContainerRegion(_cache.getRegion( - Region.SEPARATOR + CacheServerImpl.clientMessagesRegion((GemFireCacheImpl) _cache, - (String) overflowAttributesList.get(0), + haContainer = new HAContainerRegion(_cache.getRegion(Region.SEPARATOR + + CacheServerImpl.clientMessagesRegion(_cache, (String) overflowAttributesList.get(0), ((Integer) overflowAttributesList.get(1)).intValue(), ((Integer) overflowAttributesList.get(2)).intValue(), (String) overflowAttributesList.get(3), (Boolean) overflowAttributesList.get(4)))); @@ -2664,11 +2330,10 @@ public class CacheClientNotifier { /** * @param _cache the _cache to set */ - private void setCache(GemFireCacheImpl _cache) { + private void setCache(InternalCache _cache) { this._cache = _cache; } - private class ExpireBlackListTask extends PoolTask { private ClientProxyMembershipID proxyID; http://git-wip-us.apache.org/repos/asf/geode/blob/4b489e56/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java index 18f13f8..bc3765a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java @@ -12,7 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache.tier.sockets; import java.io.ByteArrayInputStream; @@ -39,11 +38,14 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; +import org.apache.logging.log4j.Logger; +import org.apache.shiro.subject.Subject; +import org.apache.shiro.util.ThreadState; + import org.apache.geode.CancelException; import org.apache.geode.DataSerializer; import org.apache.geode.StatisticsFactory; import org.apache.geode.cache.Cache; -import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.ClientSession; import org.apache.geode.cache.DynamicRegionFactory; @@ -80,8 +82,8 @@ import org.apache.geode.internal.cache.DistributedRegion; import org.apache.geode.internal.cache.EnumListenerEvent; import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.FilterProfile; -import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InterestRegistrationEventImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.StateFlushOperation; @@ -102,16 +104,12 @@ import org.apache.geode.internal.logging.log4j.LogMarker; import org.apache.geode.internal.security.AuthorizeRequestPP; import org.apache.geode.internal.security.SecurityService; import org.apache.geode.security.AccessControl; -import org.apache.logging.log4j.Logger; -import org.apache.shiro.subject.Subject; -import org.apache.shiro.util.ThreadState; /** * Class CacheClientProxy represents the server side of the {@link CacheClientUpdater}. * It queues messages to be sent from the server to the client. It then reads those messages from * the queue and sends them to the client. * - * * @since GemFire 4.2 */ @SuppressWarnings("synthetic-access") @@ -153,7 +151,7 @@ public class CacheClientProxy implements ClientSession { /** * The GemFire cache */ - protected final GemFireCacheImpl _cache; + protected final InternalCache _cache; /** * The list of keys that the client represented by this proxy is interested in (stored by region) @@ -345,7 +343,7 @@ public class CacheClientProxy implements ClientSession { Version clientVersion, long acceptorId, boolean notifyBySubscription) throws CacheException { initializeTransientFields(socket, proxyID, isPrimary, clientConflation, clientVersion); this._cacheClientNotifier = ccn; - this._cache = (GemFireCacheImpl) ccn.getCache(); + this._cache = ccn.getCache(); this._maximumMessageCount = ccn.getMaximumMessageCount(); this._messageTimeToLive = ccn.getMessageTimeToLive(); this._acceptorId = acceptorId; @@ -620,7 +618,7 @@ public class CacheClientProxy implements ClientSession { * * @return the GemFire cache */ - public GemFireCacheImpl getCache() { + public InternalCache getCache() { return this._cache; } @@ -2344,7 +2342,7 @@ public class CacheClientProxy implements ClientSession { return this._proxy; } - private GemFireCacheImpl getCache() { + private InternalCache getCache() { return getProxy().getCache(); } @@ -2410,10 +2408,6 @@ public class CacheClientProxy implements ClientSession { Thread.sleep(500); } catch (InterruptedException e) { interrupted = true; - /* - * GemFireCache c = (GemFireCache)_cache; - * c.getDistributedSystem().getCancelCriterion().checkCancelInProgress(e); - */ } catch (CancelException e) { break; } catch (CacheException e) { http://git-wip-us.apache.org/repos/asf/geode/blob/4b489e56/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java index f85ecb4..728abf7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java @@ -12,24 +12,68 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache.tier.sockets; -import org.apache.geode.*; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.net.ConnectException; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.ssl.SSLException; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.DataSerializer; +import org.apache.geode.InvalidDeltaException; +import org.apache.geode.StatisticDescriptor; +import org.apache.geode.Statistics; +import org.apache.geode.StatisticsType; +import org.apache.geode.StatisticsTypeFactory; +import org.apache.geode.SystemFailure; import org.apache.geode.cache.EntryNotFoundException; import org.apache.geode.cache.InterestResultPolicy; import org.apache.geode.cache.Operation; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.client.ServerRefusedConnectionException; -import org.apache.geode.cache.client.internal.*; +import org.apache.geode.cache.client.internal.ClientUpdater; +import org.apache.geode.cache.client.internal.Endpoint; +import org.apache.geode.cache.client.internal.EndpointManager; +import org.apache.geode.cache.client.internal.GetEventValueOp; +import org.apache.geode.cache.client.internal.PoolImpl; +import org.apache.geode.cache.client.internal.QueueManager; import org.apache.geode.cache.query.internal.cq.CqService; import org.apache.geode.distributed.DistributedSystem; -import org.apache.geode.distributed.internal.*; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.DistributionStats; +import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.InternalDistributedSystem.DisconnectListener; +import org.apache.geode.distributed.internal.ServerLocation; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.distributed.internal.membership.MemberAttributes; -import org.apache.geode.internal.*; -import org.apache.geode.internal.cache.*; +import org.apache.geode.internal.Assert; +import org.apache.geode.internal.InternalDataSerializer; +import org.apache.geode.internal.InternalInstantiator; +import org.apache.geode.internal.Version; +import org.apache.geode.internal.cache.ClientServerObserver; +import org.apache.geode.internal.cache.ClientServerObserverHolder; +import org.apache.geode.internal.cache.EntryEventImpl; +import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.tier.CachedRegionHelper; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException; @@ -48,19 +92,6 @@ import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl; import org.apache.geode.security.AuthenticationFailedException; import org.apache.geode.security.AuthenticationRequiredException; import org.apache.geode.security.GemFireSecurityException; -import org.apache.logging.log4j.Logger; - -import javax.net.ssl.SSLException; -import java.io.*; -import java.net.ConnectException; -import java.net.Socket; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; /** * CacheClientUpdater is a thread that processes update messages from a cache server @@ -107,6 +138,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn * The buffer upon which we receive messages */ private final ByteBuffer commBuffer; + private boolean commBufferReleased; private final CCUStats stats; @@ -114,9 +146,9 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn /** * Cache for which we provide service */ - private /* final */ GemFireCacheImpl cache; - private /* final */ CachedRegionHelper cacheHelper; + private /* final */ InternalCache cache; + private /* final */ CachedRegionHelper cacheHelper; /** * Principle flag to signal thread's run loop to terminate @@ -144,7 +176,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn private boolean isOpCompleted; public final static String CLIENT_UPDATER_THREAD_NAME = "Cache Client Updater Thread "; - /* + + /** * to enable test flag */ public static boolean isUsedByTest; @@ -155,20 +188,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn */ public static boolean fullValueRequested = false; - // /** - // * True if this thread been initialized. Indicates that the run thread is - // * initialized and ready to process messages - // *

- // * TODO is this still needed? - // *

- // * Accesses synchronized via this - // * - // * @see #notifyInitializationComplete() - // * @see #waitForInitialization() - // */ - // private boolean initialized = false; - - private final ServerLocation location; // TODO - remove these fields @@ -185,7 +204,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn * @return true if cache appears */ private boolean waitForCache() { - GemFireCacheImpl c; + InternalCache cache; long tilt = System.currentTimeMillis() + MAX_CACHE_WAIT * 1000; for (;;) { if (quitting()) { @@ -205,8 +224,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn new Object[] {this, MAX_CACHE_WAIT})); return false; } - c = GemFireCacheImpl.getInstance(); - if (c != null && !c.isClosed()) { + cache = GemFireCacheImpl.getInstance(); + if (cache != null && !cache.isClosed()) { break; } boolean interrupted = Thread.interrupted(); @@ -220,8 +239,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn } } } // for - this.cache = c; - this.cacheHelper = new CachedRegionHelper(c); + this.cache = cache; + this.cacheHelper = new CachedRegionHelper(cache); return true; } @@ -270,7 +289,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn OutputStream tmpOut = null; InputStream tmpIn = null; try { - /** Size of the server-to-client communication socket buffers */ + // Size of the server-to-client communication socket buffers int socketBufferSize = Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768).intValue(); @@ -323,7 +342,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn // create a "server" memberId we currently don't know much about the // server. // Would be nice for it to send us its member id - // @todo - change the serverId to use the endpoint's getMemberId() which + // TODO: change the serverId to use the endpoint's getMemberId() which // returns a // DistributedMember (once gfecq branch is merged to trunk). MemberAttributes ma = @@ -464,52 +483,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn } } - // /** - // * Waits for this thread to be initialized - // * - // * @return true if initialized; false if stopped before init - // */ - // public boolean waitForInitialization() { - // boolean result = false; - // // Yogesh : waiting on this thread object is a bad idea - // // as when thread exits it notifies to the waiting threads. - // synchronized (this) { - // for (;;) { - // if (quitting()) { - // break; - // } - // boolean interrupted = Thread.interrupted(); - // try { - // this.wait(100); // spurious wakeup ok // timed wait, should fix lost notification problem - // rahul. - // } - // catch (InterruptedException e) { - // interrupted = true; - // } - // finally { - // if (interrupted) { - // Thread.currentThread().interrupt(); - // } - // } - // } // while - // // Even if we succeed, there is a risk that we were shut down - // // Can't check for cache; it isn't set yet :-( - // this.system.getCancelCriterion().checkCancelInProgress(null); - // result = this.continueProcessing; - // } // synchronized - // return result; - // } - - // /** - // * @see #waitForInitialization() - // */ - // private void notifyInitializationComplete() { - // synchronized (this) { - // this.initialized = true; - // this.notifyAll(); - // } - // } - /** * Notifies this thread to stop processing */ @@ -1188,21 +1161,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn // message if (region.hasServerProxy()) { return; - - // NOTE: - // As explained in the method description, this code is added as part - // of CQ bug fix. Cache server team needs to look into changes relating - // to local region. - // - // Locally invalidate the region - // region.basicBridgeClientInvalidate(callbackArgument, - // proxy.getProcessedMarker()); - - // if (logger.debugEnabled()) { - // logger.debug(toString() + ": Cleared region: " + regionName - // + " callbackArgument: " + callbackArgument); - // } - } } catch (Exception e) { @@ -1241,12 +1199,12 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn // servers recursively } - // // CALLBACK TESTING PURPOSE ONLY //// + // CALLBACK TESTING PURPOSE ONLY if (PoolImpl.IS_INSTANTIATOR_CALLBACK) { ClientServerObserver bo = ClientServerObserverHolder.getInstance(); bo.afterReceivingFromServer(eventId); } - // ///////////////////////////////////// + } // TODO bug: can the following catch be more specific? catch (Exception e) { @@ -1262,7 +1220,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn final boolean isDebugEnabled = logger.isDebugEnabled(); try { int noOfParts = msg.getNumberOfParts(); - // int numOfClasses = noOfParts - 3; // 1 for ds classname, 1 for ds id and 1 for eventId. if (isDebugEnabled) { logger.debug("{}: Received register dataserializer message of parts {}", getName(), noOfParts); @@ -1273,8 +1230,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn String dataSerializerClassName = (String) CacheServerHelper.deserialize(msg.getPart(i).getSerializedForm()); int id = msg.getPart(i + 1).getInt(); - InternalDataSerializer.register(dataSerializerClassName, false, eventId, - null/* context */, id); + InternalDataSerializer.register(dataSerializerClassName, false, eventId, null, id); // distribute is false because we don't want to propagate this to // servers recursively @@ -1295,12 +1251,12 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn } } - // // CALLBACK TESTING PURPOSE ONLY //// + // CALLBACK TESTING PURPOSE ONLY if (PoolImpl.IS_INSTANTIATOR_CALLBACK) { ClientServerObserver bo = ClientServerObserverHolder.getInstance(); bo.afterReceivingFromServer(eventId); } - /////////////////////////////////////// + } // TODO bug: can the following catch be more specific? catch (Exception e) { @@ -1313,12 +1269,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn /** * Processes message to invoke CQ listeners. - * - * @param startMessagePart - * @param numCqParts - * @param messageType - * @param key - * @param value */ private int processCqs(Message m, int startMessagePart, int numCqParts, int messageType, Object key, Object value) { @@ -1328,7 +1278,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn private int processCqs(Message m, int startMessagePart, int numCqParts, int messageType, Object key, Object value, byte[] delta, EventID eventId) { - // String[] cqs = new String[numCqs/2]; HashMap cqs = new HashMap(); final boolean isDebugEnabled = logger.isDebugEnabled(); @@ -1496,7 +1445,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn } } - private void handleTombstoneOperation(Message msg) { String regionName = "unknown"; try { // not sure why this isn't done by the caller @@ -1750,10 +1698,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn // originating from the client // and by updating the last update stat, the ServerMonitor is less // likely to send pings... - // and the ClientHealthMonitor will cause a disconnect -- mthomas - // 10/18/2006 - - // this._endpoint.setLastUpdate(); + // and the ClientHealthMonitor will cause a disconnect } catch (InterruptedIOException e) { // Per Sun's support web site, this exception seems to be peculiar @@ -1868,13 +1813,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn return socket.getLocalPort(); } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.distributed.internal.InternalDistributedSystem.DisconnectListener#onDisconnect - * (org.apache.geode.distributed.internal.InternalDistributedSystem) - */ public void onDisconnect(InternalDistributedSystem sys) { stopUpdater(); } @@ -1884,15 +1822,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn */ private volatile boolean endPointDied = false; - /** - * Returns true if the end point represented by this updater is considered dead. - * - * @return true if {@link #endpoint} died. - */ - public boolean isEndPointDead() { - return this.endPointDied; - } - private void verifySocketBufferSize(int requestedBufferSize, int actualBufferSize, String type) { if (actualBufferSize < requestedBufferSize) { logger.info(LocalizedMessage.create( @@ -1973,11 +1902,9 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn public long startTime() { return DistributionStats.getStatTime(); } - } public boolean isProcessing() { - // TODO Auto-generated method stub return continueProcessing.get(); } } http://git-wip-us.apache.org/repos/asf/geode/blob/4b489e56/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java index e21a834..e0b5ab8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java @@ -12,35 +12,44 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache.tier.sockets; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicIntegerArray; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.CancelException; import org.apache.geode.SystemFailure; -import org.apache.geode.cache.Cache; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.SystemTimer.SystemTimerTask; import org.apache.geode.internal.Version; -import org.apache.geode.internal.cache.*; +import org.apache.geode.internal.cache.CacheClientStatus; +import org.apache.geode.internal.cache.IncomingGatewayStatus; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.TXId; +import org.apache.geode.internal.cache.TXManagerImpl; import org.apache.geode.internal.cache.tier.Acceptor; import org.apache.geode.internal.concurrent.ConcurrentHashSet; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.LoggingThreadGroup; import org.apache.geode.internal.logging.log4j.LocalizedMessage; -import org.apache.logging.log4j.Logger; - -import java.net.InetAddress; -import java.util.*; -import java.util.concurrent.atomic.AtomicIntegerArray; /** * Class ClientHealthMonitor is a server-side singleton that monitors the health of * clients by looking at their heartbeats. If too much time elapses between heartbeats, the monitor * determines that the client is dead and interrupts its threads. - * - * + * * @since GemFire 4.2.3 */ public class ClientHealthMonitor { @@ -69,7 +78,7 @@ public class ClientHealthMonitor { /** * THe GemFire Cache */ - final protected Cache _cache; + private final InternalCache _cache; /** * A thread that validates client connections @@ -123,7 +132,7 @@ public class ClientHealthMonitor { * client has died and interrupting its sockets. * @return The singleton ClientHealthMonitor instance */ - public static ClientHealthMonitor getInstance(Cache cache, int maximumTimeBetweenPings, + public static ClientHealthMonitor getInstance(InternalCache cache, int maximumTimeBetweenPings, CacheClientNotifierStats stats) { createInstance(cache, maximumTimeBetweenPings, stats); return _instance; @@ -305,7 +314,7 @@ public class ClientHealthMonitor { scheduledToBeRemovedTx.removeAll(txids); } }; - ((GemFireCacheImpl) this._cache).getCCPTimer().schedule(task, timeout); + this._cache.getCCPTimer().schedule(task, timeout); } } } @@ -384,55 +393,6 @@ public class ClientHealthMonitor { } } - // /** - // * Returns modifiable map (changes do not effect this class) of memberId - // * to connection count. - // */ - // public Map getConnectedClients() { - // Map map = new HashMap(); // KEY=memberId, VALUE=connectionCount (Integer) - // synchronized (_clientThreadsLock) { - // Iterator connectedClients = this._clientThreads.entrySet().iterator(); - // while (connectedClients.hasNext()) { - // Map.Entry entry = (Map.Entry) connectedClients.next(); - // String memberId = (String) entry.getKey();// memberId includes FQDN - // Set connections = (Set) entry.getValue(); - // int socketPort = 0; - // InetAddress socketAddress = null; - // ///* - // Iterator serverConnections = connections.iterator(); - // // Get data from one. - // while (serverConnections.hasNext()) { - // ServerConnection sc = (ServerConnection) serverConnections.next(); - // socketPort = sc.getSocketPort(); - // socketAddress = sc.getSocketAddress(); - // break; - // } - // //*/ - // int connectionCount = connections.size(); - // String clientString = null; - // if (socketAddress == null) { - // clientString = "client member id=" + memberId; - // } else { - // clientString = "host name=" + socketAddress.toString() + " host ip=" + - // socketAddress.getHostAddress() + " client port=" + socketPort + " client - // member id=" + memberId; - // } - // map.put(memberId, new Object[] {clientString, new - // Integer(connectionCount)}); - // /* Note: all client addresses are same... - // Iterator serverThreads = ((Set) entry.getValue()).iterator(); - // while (serverThreads.hasNext()) { - // ServerConnection connection = (ServerConnection) serverThreads.next(); - // InetAddress clientAddress = connection.getClientAddress(); - // logger.severe("getConnectedClients: memberId=" + memberId + - // " clientAddress=" + clientAddress + " FQDN=" + - // clientAddress.getCanonicalHostName()); - // }*/ - // } - // } - // return map; - // } - /** * Returns modifiable map (changes do not effect this class) of client membershipID to connection * count. This is different from the map contained in this class as here the key is client @@ -442,7 +402,6 @@ public class ClientHealthMonitor { * @param filterProxies Set identifying the Connection proxies which should be fetched. These * ConnectionProxies may be from same client member or different. If it is null this would * mean to fetch the Connections of all the ConnectionProxy objects. - * */ public Map getConnectedClients(Set filterProxies) { Map map = new HashMap(); // KEY=proxyID, VALUE=connectionCount (Integer) @@ -677,7 +636,6 @@ public class ClientHealthMonitor { return this._clientHeartbeats; } - /** * Shuts down the singleton CacheClientNotifier instance. */ @@ -693,10 +651,9 @@ public class ClientHealthMonitor { * * @param cache The GemFire Cache * @param maximumTimeBetweenPings The maximum time allowed between pings before determining the - * client has died and interrupting its sockets. */ - protected static synchronized void createInstance(Cache cache, int maximumTimeBetweenPings, - CacheClientNotifierStats stats) { + protected static synchronized void createInstance(InternalCache cache, + int maximumTimeBetweenPings, CacheClientNotifierStats stats) { refCount++; if (_instance != null) { return; @@ -710,9 +667,8 @@ public class ClientHealthMonitor { * * @param cache The GemFire Cache * @param maximumTimeBetweenPings The maximum time allowed between pings before determining the - * client has died and interrupting its sockets. */ - private ClientHealthMonitor(Cache cache, int maximumTimeBetweenPings, + private ClientHealthMonitor(InternalCache cache, int maximumTimeBetweenPings, CacheClientNotifierStats stats) { // Set the Cache this._cache = cache; http://git-wip-us.apache.org/repos/asf/geode/blob/4b489e56/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index ecd9c7a..6eadee3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -12,7 +12,6 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache.tier.sockets; import static org.apache.geode.distributed.ConfigurationProperties.*; @@ -49,7 +48,7 @@ import org.apache.geode.internal.Assert; import org.apache.geode.internal.HeapDataOutputStream; import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.EventID; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.Acceptor; import org.apache.geode.internal.cache.tier.CachedRegionHelper; import org.apache.geode.internal.cache.tier.ClientHandShake; @@ -373,7 +372,7 @@ public class ServerConnection implements Runnable { return getCache().getDistributedSystem(); } - public Cache getCache() { + public InternalCache getCache() { return this.crHelper.getCache(); } @@ -578,7 +577,7 @@ public class ServerConnection implements Runnable { private boolean isFiringMembershipEvents() { return this.acceptor.isRunning() - && !((GemFireCacheImpl) this.acceptor.getCachedRegionHelper().getCache()).isClosed() + && !(this.acceptor.getCachedRegionHelper().getCache()).isClosed() && !acceptor.getCachedRegionHelper().getCache().getCancelCriterion().isCancelInProgress(); } http://git-wip-us.apache.org/repos/asf/geode/blob/4b489e56/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java index 5a02525..1b599e9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java @@ -12,16 +12,13 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -/** - * - */ package org.apache.geode.internal.cache.tier.sockets.command; import java.io.IOException; import org.apache.logging.log4j.Logger; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; import org.apache.geode.internal.cache.tier.sockets.Message; @@ -30,7 +27,6 @@ import org.apache.geode.internal.logging.LogService; import org.apache.geode.pdx.internal.EnumInfo; import org.apache.geode.pdx.internal.TypeRegistry; - public class AddPdxEnum extends BaseCommand { private static final Logger logger = LogService.getLogger(); @@ -56,7 +52,7 @@ public class AddPdxEnum extends BaseCommand { int enumId = msg.getPart(1).getInt(); try { - GemFireCacheImpl cache = (GemFireCacheImpl) servConn.getCache(); + InternalCache cache = servConn.getCache(); TypeRegistry registry = cache.getPdxRegistry(); registry.addRemoteEnum(enumId, enumInfo); } catch (Exception e) {