Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B172B200C25 for ; Thu, 9 Feb 2017 23:29:09 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id AEA0C160B4B; Thu, 9 Feb 2017 22:29:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 63247160B70 for ; Thu, 9 Feb 2017 23:29:08 +0100 (CET) Received: (qmail 85816 invoked by uid 500); 9 Feb 2017 22:29:07 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 85512 invoked by uid 99); 9 Feb 2017 22:29:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Feb 2017 22:29:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1ADF1E007F; Thu, 9 Feb 2017 22:29:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bschuchardt@apache.org To: commits@geode.apache.org Date: Thu, 09 Feb 2017 22:29:12 -0000 Message-Id: <6f08e06aacfa42bc975163282018f7e7@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [6/7] geode git commit: [GEODE-2324] keep private variables private. archived-at: Thu, 09 Feb 2017 22:29:09 -0000 [GEODE-2324] keep private variables private. * Update AcceptorImplDUnit test, * Refactor AcceptorImpl.close into multiple methods. * Remove a commented-out method. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/d73ec978 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/d73ec978 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/d73ec978 Branch: refs/heads/develop Commit: d73ec978476ec4bf835c38d1713e14b48324515f Parents: 6bed282 Author: Galen O'Sullivan Authored: Wed Jan 25 11:30:07 2017 -0800 Committer: Bruce Schuchardt Committed: Thu Feb 9 14:28:48 2017 -0800 ---------------------------------------------------------------------- .../cache/tier/sockets/AcceptorImpl.java | 202 +++++++----------- .../sockets/command/AcceptorImplObserver.java | 29 --- .../tier/sockets/AcceptorImplDUnitTest.java | 205 +++++++++++++------ .../tier/sockets/AcceptorImplJUnitTest.java | 42 ++-- 4 files changed, 243 insertions(+), 235 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/d73ec978/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java index 1bca1fd..5fa8096 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java @@ -57,7 +57,6 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLException; -import org.apache.geode.internal.cache.tier.sockets.command.AcceptorImplObserver; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; @@ -207,7 +206,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { public final AtomicInteger clientServerCnxCount = new AtomicInteger(); /** Has this acceptor been shut down */ - private volatile boolean shutdown = false; + private volatile boolean shutdownStarted = false; /** The thread that runs the acceptor */ private Thread thread = null; @@ -271,33 +270,6 @@ public class AcceptorImpl extends Acceptor implements Runnable { private SecurityService securityService = IntegratedSecurityService.getSecurityService(); - // Assumed non-null. Do not set this to null. - private static AcceptorImplObserver acceptorImplObserver_do_not_access_directly = - new AcceptorImplObserver() { - @Override - public void beforeClose(AcceptorImpl acceptorImpl) {} - - @Override - public void normalCloseTermination(AcceptorImpl acceptorImpl) {} - - @Override - public void afterClose(AcceptorImpl acceptorImpl) {} - }; - - private static AcceptorImplObserver getAcceptorImplObserver() { - synchronized (AcceptorImpl.class) { - return acceptorImplObserver_do_not_access_directly; - } - } - - public static void setObserver_TESTONLY(AcceptorImplObserver observer) { - synchronized (AcceptorImpl.class) { - if (observer != null) { - acceptorImplObserver_do_not_access_directly = observer; - } - } - } - /** * Initializes this acceptor thread to listen for connections on the given port. * @@ -1551,19 +1523,17 @@ public class AcceptorImpl extends Acceptor implements Runnable { @Override public boolean isRunning() { - return !this.shutdown; + return !this.shutdownStarted; } @Override public void close() { - AcceptorImplObserver acceptorImplObserver = getAcceptorImplObserver(); try { synchronized (syncLock) { - acceptorImplObserver.beforeClose(this); if (!isRunning()) { return; } - this.shutdown = true; + this.shutdownStarted = true; logger.info(LocalizedMessage.create( LocalizedStrings.AcceptorImpl_CACHE_SERVER_ON_PORT_0_IS_SHUTTING_DOWN, this.localPort)); if (this.thread != null) { @@ -1572,81 +1542,87 @@ public class AcceptorImpl extends Acceptor implements Runnable { try { this.serverSock.close(); } catch (IOException ignore) { - // Well, we tried. Continue shutting down. } + crHelper.setShutdown(true); // set this before shutting down the pool - if (isSelector()) { - this.hsTimer.cancel(); - if (this.tmpSel != null) { - try { - this.tmpSel.close(); - } catch (IOException ignore) { - } - } - try { - wakeupSelector(); - this.selector.close(); - } catch (IOException ignore) { - } - if (this.selectorThread != null) { - this.selectorThread.interrupt(); - } - this.commBufferQueue.clear(); - } + shutdownSelectorIfIsSelector(); ClientHealthMonitor.shutdownInstance(); shutdownSCs(); this.clientNotifier.shutdown(this.acceptorId); - this.pool.shutdown(); - - try { - if (!this.pool.awaitTermination(PoolImpl.SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) { - logger.warn(LocalizedMessage.create( - LocalizedStrings.PoolImpl_TIMEOUT_WAITING_FOR_BACKGROUND_TASKS_TO_COMPLETE)); - this.pool.shutdownNow(); - } - } catch (InterruptedException ignore) { - Thread.currentThread().interrupt(); - this.pool.shutdownNow(); - } - this.hsPool.shutdownNow(); + shutdownPools(); this.stats.close(); - GemFireCacheImpl myCache = (GemFireCacheImpl) cache; - if (!myCache.forcedDisconnect()) { - Set prs = myCache.getPartitionedRegions(); - for (PartitionedRegion pr : prs) { - Map profiles = - new HashMap(); - // get all local real bucket advisors - Map advisors = pr.getRegionAdvisor().getAllBucketAdvisors(); - for (Map.Entry entry : advisors.entrySet()) { - BucketAdvisor advisor = entry.getValue(); - BucketProfile bp = (BucketProfile) advisor.createProfile(); - advisor.updateServerBucketProfile(bp); - profiles.put(entry.getKey(), bp); - } - Set receipients = new HashSet(); - receipients = pr.getRegionAdvisor().adviseAllPRNodes(); - // send it to all in one messgae - ReplyProcessor21 reply = AllBucketProfilesUpdateMessage.send(receipients, - pr.getDistributionManager(), pr.getPRId(), profiles, true); - if (reply != null) { - reply.waitForRepliesUninterruptibly(); - } - - if (logger.isDebugEnabled()) { - logger.debug("sending messages to all peers for removing this server.."); - } - } - } - acceptorImplObserver.normalCloseTermination(this); + notifyCacheMembersOfClose(); } // synchronized } catch (RuntimeException e) {/* ignore and log */ logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED), e); - } finally { - acceptorImplObserver.afterClose(this); } } + private void notifyCacheMembersOfClose() { + GemFireCacheImpl myCache = (GemFireCacheImpl) cache; + if (!myCache.forcedDisconnect()) { + for (PartitionedRegion pr : myCache.getPartitionedRegions()) { + Map profiles = new HashMap<>(); + // get all local real bucket advisors + Map advisors = pr.getRegionAdvisor().getAllBucketAdvisors(); + for (Map.Entry entry : advisors.entrySet()) { + BucketAdvisor advisor = entry.getValue(); + BucketProfile bp = (BucketProfile) advisor.createProfile(); + advisor.updateServerBucketProfile(bp); + profiles.put(entry.getKey(), bp); + } + + Set recipients = pr.getRegionAdvisor().adviseAllPRNodes(); + // send it to all in one message + ReplyProcessor21 reply = AllBucketProfilesUpdateMessage.send(recipients, + pr.getDistributionManager(), pr.getPRId(), profiles, true); + if (reply != null) { + reply.waitForRepliesUninterruptibly(); + } + + if (logger.isDebugEnabled()) { + logger.debug("sending messages to all peers for removing this server.."); + } + } + } + } + + private void shutdownSelectorIfIsSelector() { + if (isSelector()) { + this.hsTimer.cancel(); + if (this.tmpSel != null) { + try { + this.tmpSel.close(); + } catch (IOException ignore) { + } + } + try { + wakeupSelector(); + this.selector.close(); + } catch (IOException ignore) { + } + if (this.selectorThread != null) { + this.selectorThread.interrupt(); + } + this.commBufferQueue.clear(); + } + } + + private void shutdownPools() { + this.pool.shutdown(); + try { + if (!this.pool.awaitTermination(PoolImpl.SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) { + logger.warn(LocalizedMessage + .create(LocalizedStrings.PoolImpl_TIMEOUT_WAITING_FOR_BACKGROUND_TASKS_TO_COMPLETE)); + this.pool.shutdownNow(); + } + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + this.pool.shutdownNow(); + } + this.hsPool.shutdownNow(); + } + private void shutdownSCs() { // added to fix part 2 of bug 37351. synchronized (this.allSCsLock) { @@ -1657,36 +1633,12 @@ public class AcceptorImpl extends Acceptor implements Runnable { } } + public boolean isShutdownProperly() { + return !isRunning() && (selectorThread == null || !selectorThread.isAlive()) + && (pool == null || pool.isShutdown()) && (hsPool == null || hsPool.isShutdown()) + && (selector == null || !selector.isOpen()); + } - // protected InetAddress getBindAddress() { - // return this.bindAddress; - // } - - // /** - // * Calculates the bind address based on gemfire.properties. - // * Returns null if no bind address is configured. - // * @since GemFire 5.7 - // */ - // public static InetAddress calcBindAddress(Cache cache) throws IOException { - // InternalDistributedSystem system = (InternalDistributedSystem)cache - // .getDistributedSystem(); - // DistributionConfig config = system.getConfig(); - // InetAddress address = null; - - // // Get the server-bind-address. If it is not null, use it. - // // If it is null, get the bind-address. If it is not null, use it. - // // Otherwise set default. - // String serverBindAddress = config.getServerBindAddress(); - // if (serverBindAddress != null && serverBindAddress.length() > 0) { - // address = InetAddress.getByName(serverBindAddress); - // } else { - // String bindAddress = config.getBindAddress(); - // if (bindAddress != null && bindAddress.length() > 0) { - // address = InetAddress.getByName(bindAddress); - // } - // } - // return address; - // } /** * @param bindName the ip address or host name that this acceptor should bind to. If null or "" * then calculate it. http://git-wip-us.apache.org/repos/asf/geode/blob/d73ec978/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AcceptorImplObserver.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AcceptorImplObserver.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AcceptorImplObserver.java deleted file mode 100644 index 3d02878..0000000 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AcceptorImplObserver.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.geode.internal.cache.tier.sockets.command; - -import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl; - -/** - * AcceptorImplObserver is an observer/visitor for AcceptorImpl that is used for testing. - */ -public interface AcceptorImplObserver { - void beforeClose(AcceptorImpl acceptorImpl); - - void normalCloseTermination(AcceptorImpl acceptorImpl); - - void afterClose(AcceptorImpl acceptorImpl); -} http://git-wip-us.apache.org/repos/asf/geode/blob/d73ec978/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java index ca8592f..810aabc 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java @@ -15,9 +15,10 @@ package org.apache.geode.internal.cache.tier.sockets; -import org.apache.geode.cache.Cache; +import com.jayway.awaitility.Awaitility; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.GemFireCache; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionFactory; import org.apache.geode.cache.RegionShortcut; @@ -27,22 +28,21 @@ import org.apache.geode.cache.client.ClientRegionShortcut; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.cache.util.CacheWriterAdapter; import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.internal.cache.CacheServerImpl; +import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.internal.cache.tier.sockets.command.AcceptorImplObserver; import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.ThreadUtils; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; import org.apache.geode.test.junit.categories.DistributedTest; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadInfo; -import java.lang.management.ThreadMXBean; -import java.util.Arrays; +import java.util.List; import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; import static org.junit.Assert.*; @@ -52,44 +52,79 @@ import static org.junit.Assert.*; */ @Category(DistributedTest.class) public class AcceptorImplDUnitTest extends JUnit4DistributedTestCase { - private static Cache cache; public AcceptorImplDUnitTest() { super(); } - @Override - public void postTearDown() throws Exception { - if (cache != null) { - cache.close(); - cache = null; + // SleepyCacheWriter will block indefinitely. + // Anyone who has a handle on the SleepyCacheWriter can interrupt it by calling wakeUp. + class SleepyCacheWriter extends CacheWriterAdapter { + private boolean setOnStart; + private boolean setOnInterrupt; + private boolean stopWaiting; + // locks the above three booleans. + private final Object lock = new Object(); + + public void notifyStart() { + synchronized (lock) { + setOnStart = true; + } + } + + public boolean isStarted() { + synchronized (lock) { + return setOnStart; + } + } + + public void notifyInterrupt() { + synchronized (lock) { + setOnInterrupt = true; + } + } + + public boolean isInterrupted() { + synchronized (lock) { + return setOnInterrupt; + } + } + + public void stopWaiting() { + synchronized (lock) { + this.stopWaiting = true; + lock.notify(); + } + } + + public boolean isReadyToQuit() { + synchronized (lock) { + return stopWaiting; + } } - super.postTearDown(); - } - public static class SleepyCacheWriter extends CacheWriterAdapter { + SleepyCacheWriter() {} + @Override public void beforeCreate(EntryEvent event) { - while (true) { - System.out.println("Sleeping a long time."); + System.out.println("Sleeping a long time."); + notifyStart(); + while (!isReadyToQuit()) { try { - Thread.sleep(100000000); - } catch (InterruptedException ignore) { + synchronized (lock) { + lock.wait(); + } + } catch (InterruptedException ex) { + notifyInterrupt(); } } + if (isInterrupted()) { + Thread.currentThread().interrupt(); + } } } /** - * Dump threads to standard out. For debugging. - */ - private void dumpThreads() { - ThreadMXBean bean = ManagementFactory.getThreadMXBean(); - ThreadInfo[] infos = bean.dumpAllThreads(true, true); - System.out.println("infos = " + Arrays.toString(infos)); - } - - /** * GEODE-2324. There was a bug where, due to an uncaught exception, `AcceptorImpl.close()` was * short-circuiting and failing to clean up properly. * @@ -100,40 +135,19 @@ public class AcceptorImplDUnitTest extends JUnit4DistributedTestCase { * since the fields are private) and implementation-dependent. */ @Test - public void testShutdownCatchesException() throws Exception { + public void testAcceptorImplCloseCleansUpWithHangingConnection() throws Exception { final String hostname = Host.getHost(0).getHostName(); final VM clientVM = Host.getHost(0).getVM(0); - // AtomicBooleans can be set from wherever they are, including an anonymous class or other - // thread. - AtomicBoolean terminatedNormally = new AtomicBoolean(false); - AtomicBoolean passedPostConditions = new AtomicBoolean(false); - Properties props = new Properties(); props.setProperty(MCAST_PORT, "0"); - AcceptorImpl.setObserver_TESTONLY(new AcceptorImplObserver() { - @Override - public void beforeClose(AcceptorImpl acceptorImpl) { - Thread.currentThread().interrupt(); - } - - @Override - public void normalCloseTermination(AcceptorImpl acceptorImpl) { - terminatedNormally.set(true); - } - - @Override - public void afterClose(AcceptorImpl acceptorImpl) { - passedPostConditions.set(!acceptorImpl.isRunning()); - } - }); - try (InternalCache cache = (InternalCache) new CacheFactory(props).create()) { RegionFactory regionFactory = cache.createRegionFactory(RegionShortcut.PARTITION); - regionFactory.setCacheWriter(new SleepyCacheWriter<>()); + SleepyCacheWriter sleepyCacheWriter = new SleepyCacheWriter<>(); + regionFactory.setCacheWriter(sleepyCacheWriter); final CacheServer server = cache.addCacheServer(); final int port = AvailablePortHelper.getRandomAvailableTCPPort(); @@ -142,32 +156,93 @@ public class AcceptorImplDUnitTest extends JUnit4DistributedTestCase { regionFactory.create("region1"); + assertTrue(cache.isServer()); + assertFalse(cache.isClosed()); + + Awaitility.await("Acceptor is up and running").atMost(10, SECONDS) + .until(() -> getAcceptorImplFromCache(cache) != null); + AcceptorImpl acceptorImpl = getAcceptorImplFromCache(cache); + + clientVM.invokeAsync(() -> { + // System.setProperty("gemfire.PoolImpl.TRY_SERVERS_ONCE", "true"); ClientCacheFactory clientCacheFactory = new ClientCacheFactory(); clientCacheFactory.addPoolServer(hostname, port); + clientCacheFactory.setPoolReadTimeout(5000); + clientCacheFactory.setPoolRetryAttempts(1); + clientCacheFactory.setPoolMaxConnections(1); + clientCacheFactory.setPoolFreeConnectionTimeout(1000); ClientCache clientCache = clientCacheFactory.create(); Region clientRegion1 = clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create("region1"); clientRegion1.put("foo", "bar"); }); + Awaitility.await("Cache writer starts").atMost(10, SECONDS) + .until(sleepyCacheWriter::isStarted); + cache.close(); - dumpThreads(); - assertTrue(terminatedNormally.get()); - assertTrue(passedPostConditions.get()); + Awaitility.await("Cache writer interrupted").atMost(10, SECONDS) + .until(sleepyCacheWriter::isInterrupted); - // cleanup. - AcceptorImpl.setObserver_TESTONLY(new AcceptorImplObserver() { - @Override - public void beforeClose(AcceptorImpl acceptorImpl) {} + sleepyCacheWriter.stopWaiting(); - @Override - public void normalCloseTermination(AcceptorImpl acceptorImpl) {} + Awaitility.await("Acceptor shuts down properly").atMost(10, SECONDS) + .until(() -> acceptorImpl.isShutdownProperly()); - @Override - public void afterClose(AcceptorImpl acceptorImpl) {} - }); + ThreadUtils.dumpMyThreads(); // for debugging. + + regionFactory.setCacheWriter(null); + } + } + + + @Test + public void testAcceptorImplCloseCleansUp() throws Exception { + Properties props = new Properties(); + props.setProperty(MCAST_PORT, "0"); + + try (InternalCache cache = (InternalCache) new CacheFactory(props).create()) { + RegionFactory regionFactory = + cache.createRegionFactory(RegionShortcut.PARTITION); + + final CacheServer server = cache.addCacheServer(); + final int port = AvailablePortHelper.getRandomAvailableTCPPort(); + server.setPort(port); + server.start(); + + regionFactory.create("region1"); + + assertTrue(cache.isServer()); + assertFalse(cache.isClosed()); + Awaitility.await("Acceptor is up and running").atMost(10, SECONDS) + .until(() -> getAcceptorImplFromCache(cache) != null); + + AcceptorImpl acceptorImpl = getAcceptorImplFromCache(cache); + + cache.close(); + Awaitility.await("Acceptor shuts down properly").atMost(10, SECONDS) + .until(acceptorImpl::isShutdownProperly); + + assertTrue(cache.isClosed()); + assertFalse(acceptorImpl.isRunning()); + } + } + + /** + * + * @param cache + * @return the cache's Acceptor, if there is exactly one CacheServer. Otherwise null. + */ + public AcceptorImpl getAcceptorImplFromCache(GemFireCache cache) { + GemFireCacheImpl gemFireCache = (GemFireCacheImpl) cache; + List cacheServers = gemFireCache.getCacheServers(); + if (cacheServers.size() != 1) { + return null; } + + CacheServerImpl cacheServerImpl = (CacheServerImpl) cacheServers.get(0); + return cacheServerImpl.getAcceptor(); } } http://git-wip-us.apache.org/repos/asf/geode/blob/d73ec978/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java index 58c2157..7aa11b7 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java @@ -15,13 +15,9 @@ package org.apache.geode.internal.cache.tier.sockets; import org.apache.geode.cache.AttributesFactory; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionFactory; -import org.apache.geode.cache.client.ClientCache; -import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.cache.client.ServerRefusedConnectionException; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.distributed.DistributedSystem; @@ -46,16 +42,30 @@ import java.net.BindException; import java.net.Socket; import java.util.Collections; import java.util.Properties; -import java.util.Set; -import static org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE; import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; -import static org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID.system; import static org.junit.Assert.*; @Category({IntegrationTest.class, ClientServerTest.class}) public class AcceptorImplJUnitTest { + DistributedSystem system; + InternalCache cache; + + @Before + public void setUp() throws Exception { + Properties p = new Properties(); + p.setProperty(MCAST_PORT, "0"); + this.system = DistributedSystem.connect(p); + this.cache = (InternalCache) CacheFactory.create(system); + } + + @After + public void tearDown() throws Exception { + this.cache.close(); + this.system.disconnect(); + } + /* * Test method for 'org.apache.geode.internal.cache.tier.sockets.AcceptorImpl(int, int, boolean, * int, Cache)' @@ -64,16 +74,15 @@ public class AcceptorImplJUnitTest { @Test public void testConstructor() throws CacheException, IOException { AcceptorImpl a1 = null, a2 = null, a3 = null; - Properties props = new Properties(); - props.setProperty(MCAST_PORT, "0"); - try (InternalCache cache = (InternalCache) new CacheFactory(props).create()) { + try { final int[] freeTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2); int port1 = freeTCPPorts[0]; int port2 = freeTCPPorts[1]; + try { new AcceptorImpl(port1, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, - CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache, + CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, AcceptorImpl.MINIMUM_MAX_CONNECTIONS - 1, CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY); @@ -83,7 +92,7 @@ public class AcceptorImplJUnitTest { try { new AcceptorImpl(port2, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, - CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache, 0, + CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, 0, CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY); @@ -93,12 +102,12 @@ public class AcceptorImplJUnitTest { try { a1 = new AcceptorImpl(port1, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, - CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache, + CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY); a2 = new AcceptorImpl(port1, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, - CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache, + CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY); @@ -107,12 +116,13 @@ public class AcceptorImplJUnitTest { } a3 = new AcceptorImpl(port2, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, - CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache, + CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY); assertEquals(port2, a3.getPort()); - InternalDistributedSystem isystem = (InternalDistributedSystem) cache.getDistributedSystem(); + InternalDistributedSystem isystem = + (InternalDistributedSystem) this.cache.getDistributedSystem(); DistributionConfig config = isystem.getConfig(); String bindAddress = config.getBindAddress(); if (bindAddress == null || bindAddress.length() <= 0) {