Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0A58918A8A for ; Sat, 20 Feb 2016 00:00:32 +0000 (UTC) Received: (qmail 44370 invoked by uid 500); 20 Feb 2016 00:00:32 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 44338 invoked by uid 500); 20 Feb 2016 00:00:31 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 44329 invoked by uid 99); 20 Feb 2016 00:00:31 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 20 Feb 2016 00:00:31 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 519551A1091 for ; Sat, 20 Feb 2016 00:00:31 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.549 X-Spam-Level: X-Spam-Status: No, score=-3.549 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.329] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id k2oaPQX6XyEi for ; Fri, 19 Feb 2016 23:59:53 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 369DC60D11 for ; Fri, 19 Feb 2016 23:59:22 +0000 (UTC) Received: (qmail 38475 invoked by uid 99); 19 Feb 2016 23:59:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Feb 2016 23:59:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9533DE0577; Fri, 19 Feb 2016 23:59:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: upthewaterspout@apache.org To: commits@geode.incubator.apache.org Date: Fri, 19 Feb 2016 23:59:52 -0000 Message-Id: <1fc1e568e6144b1a8c38e51e2d377a05@git.apache.org> In-Reply-To: <9c47094a60244531a8ad41691b717ce6@git.apache.org> References: <9c47094a60244531a8ad41691b717ce6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [34/51] [abbrv] [partial] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-917 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java index cd4f3e4,0000000..000120b mode 100644,000000..100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java @@@ -1,821 -1,0 +1,832 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.CancelCriterion; +import com.gemstone.gemfire.GemFireIOException; +import com.gemstone.gemfire.InternalGemFireError; +import com.gemstone.gemfire.InvalidValueException; +import com.gemstone.gemfire.cache.AttributesFactory; +import com.gemstone.gemfire.cache.ClientSession; +import com.gemstone.gemfire.cache.DataPolicy; +import com.gemstone.gemfire.cache.DiskStore; +import com.gemstone.gemfire.cache.DiskStoreFactory; +import com.gemstone.gemfire.cache.DynamicRegionFactory; +import com.gemstone.gemfire.cache.EvictionAction; +import com.gemstone.gemfire.cache.ExpirationAction; +import com.gemstone.gemfire.cache.ExpirationAttributes; +import com.gemstone.gemfire.cache.InterestRegistrationListener; +import com.gemstone.gemfire.cache.RegionAttributes; +import com.gemstone.gemfire.cache.RegionExistsException; +import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.cache.server.CacheServer; +import com.gemstone.gemfire.cache.server.ClientSubscriptionConfig; +import com.gemstone.gemfire.cache.server.ServerLoadProbe; +import com.gemstone.gemfire.cache.server.internal.LoadMonitor; +import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.distributed.internal.DM; +import com.gemstone.gemfire.distributed.internal.DistributionAdvisee; +import com.gemstone.gemfire.distributed.internal.DistributionAdvisor; +import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.distributed.internal.ResourceEvent; +import com.gemstone.gemfire.distributed.internal.ServerLocation; +import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes; +import com.gemstone.gemfire.internal.Assert; +import com.gemstone.gemfire.internal.OSProcess; +import com.gemstone.gemfire.internal.admin.ClientHealthMonitoringRegion; +import com.gemstone.gemfire.internal.cache.CacheServerAdvisor.CacheServerProfile; +import com.gemstone.gemfire.internal.cache.ha.HARegionQueue; +import com.gemstone.gemfire.internal.cache.tier.Acceptor; +import com.gemstone.gemfire.internal.cache.tier.sockets.AcceptorImpl; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier; +import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; +import com.gemstone.gemfire.management.membership.ClientMembership; +import com.gemstone.gemfire.management.membership.ClientMembershipListener; + +/** + * An implementation of the CacheServer interface that delegates + * most of the heavy lifting to an {@link Acceptor}. + * + * @author David Whitlock + * @since 4.0 + */ +@SuppressWarnings("deprecation") +public class CacheServerImpl + extends AbstractCacheServer + implements DistributionAdvisee { + + private static final Logger logger = LogService.getLogger(); + + private static final int FORCE_LOAD_UPDATE_FREQUENCY= Integer.getInteger("gemfire.BridgeServer.FORCE_LOAD_UPDATE_FREQUENCY", 10).intValue(); + + /** The acceptor that does the actual serving */ + private volatile AcceptorImpl acceptor; + + /** + * The advisor used by this cache server. + * @since 5.7 + */ + private volatile CacheServerAdvisor advisor; + + /** + * The monitor used to monitor load on this + * bridge server and distribute load to the locators + * @since 5.7 + */ + private volatile LoadMonitor loadMonitor; + + /** + * boolean that represents whether this server is a GatewayReceiver or a simple BridgeServer + */ + private boolean isGatewayReceiver; + + private List gatewayTransportFilters = Collections.EMPTY_LIST; + ++ /** is this a server created by a launcher as opposed to by an application or XML? */ ++ private boolean isDefaultServer; ++ + /** + * Needed because this guy is an advisee + * @since 5.7 + */ + private int serialNumber; // changed on each start + + public static final boolean ENABLE_NOTIFY_BY_SUBSCRIPTION_FALSE = + Boolean.getBoolean("gemfire.cache-server.enable-notify-by-subscription-false"); + + + // ////////////////////// Constructors ////////////////////// + + /** + * Creates a new BridgeServerImpl that serves the contents of + * the give Cache. It has the default configuration. + */ + public CacheServerImpl(GemFireCacheImpl cache, boolean isGatewayReceiver) { + super(cache); + this.isGatewayReceiver = isGatewayReceiver; + } + + // //////////////////// Instance Methods /////////////////// + + public CancelCriterion getCancelCriterion() { + return cache.getCancelCriterion(); + } + + /** + * Checks to see whether or not this bridge server is running. If so, an + * {@link IllegalStateException} is thrown. + */ + private void checkRunning() { + if (this.isRunning()) { + throw new IllegalStateException(LocalizedStrings.CacheServerImpl_A_CACHE_SERVERS_CONFIGURATION_CANNOT_BE_CHANGED_ONCE_IT_IS_RUNNING.toLocalizedString()); + } + } + + public boolean isGatewayReceiver() { + return this.isGatewayReceiver; + } + + @Override + public int getPort() { + if (this.acceptor != null) { + return this.acceptor.getPort(); + } + else { + return super.getPort(); + } + } + + @Override + public void setPort(int port) { + checkRunning(); + super.setPort(port); + } + + @Override + public void setBindAddress(String address) { + checkRunning(); + super.setBindAddress(address); + } + @Override + public void setHostnameForClients(String name) { + checkRunning(); + super.setHostnameForClients(name); + } + + @Override + public void setMaxConnections(int maxCon) { + checkRunning(); + super.setMaxConnections(maxCon); + } + + @Override + public void setMaxThreads(int maxThreads) { + checkRunning(); + super.setMaxThreads(maxThreads); + } + + @Override + public void setNotifyBySubscription(boolean b) { + checkRunning(); + if (CacheServerImpl.ENABLE_NOTIFY_BY_SUBSCRIPTION_FALSE) { + this.notifyBySubscription = b; + } + } + + @Override + public void setMaximumMessageCount(int maximumMessageCount) { + checkRunning(); + super.setMaximumMessageCount(maximumMessageCount); + } + + @Override + public void setSocketBufferSize(int socketBufferSize) { + this.socketBufferSize = socketBufferSize; + } + + @Override + public int getSocketBufferSize() { + return this.socketBufferSize; + } + + @Override + public void setMaximumTimeBetweenPings(int maximumTimeBetweenPings) { + this.maximumTimeBetweenPings = maximumTimeBetweenPings; + } + + @Override + public int getMaximumTimeBetweenPings() { + return this.maximumTimeBetweenPings; + } + + + @Override + public void setLoadPollInterval(long loadPollInterval) { + checkRunning(); + super.setLoadPollInterval(loadPollInterval); + } + + @Override + public int getMaximumMessageCount() { + return this.maximumMessageCount; + } + + @Override + public void setLoadProbe(ServerLoadProbe loadProbe) { + checkRunning(); + super.setLoadProbe(loadProbe); + } + + public void setGatewayTransportFilter( + List transportFilters) { + this.gatewayTransportFilters = transportFilters; + } + + @Override + public int getMessageTimeToLive() { + return this.messageTimeToLive; + } + + + public ClientSubscriptionConfig getClientSubscriptionConfig(){ + return this.clientSubscriptionConfig; + } + ++ public boolean isDefaultServer() { ++ return isDefaultServer; ++ } ++ ++ public void setIsDefaultServer() { ++ this.isDefaultServer = true; ++ } ++ + /** + * Sets the configuration of this CacheServer based on + * the configuration of another CacheServer. + */ + public void configureFrom(CacheServer other) { + setPort(other.getPort()); + setBindAddress(other.getBindAddress()); + setHostnameForClients(other.getHostnameForClients()); + setMaxConnections(other.getMaxConnections()); + setMaxThreads(other.getMaxThreads()); + setNotifyBySubscription(other.getNotifyBySubscription()); + setSocketBufferSize(other.getSocketBufferSize()); + setTcpNoDelay(other.getTcpNoDelay()); + setMaximumTimeBetweenPings(other.getMaximumTimeBetweenPings()); + setMaximumMessageCount(other.getMaximumMessageCount()); + setMessageTimeToLive(other.getMessageTimeToLive()); +// setTransactionTimeToLive(other.getTransactionTimeToLive()); not implemented in CacheServer for v6.6 + setGroups(other.getGroups()); + setLoadProbe(other.getLoadProbe()); + setLoadPollInterval(other.getLoadPollInterval()); + ClientSubscriptionConfig cscOther = other.getClientSubscriptionConfig(); + ClientSubscriptionConfig cscThis = this.getClientSubscriptionConfig(); + // added for configuration of ha overflow + cscThis.setEvictionPolicy(cscOther.getEvictionPolicy()); + cscThis.setCapacity(cscOther.getCapacity()); + String diskStoreName = cscOther.getDiskStoreName(); + if (diskStoreName != null) { + cscThis.setDiskStoreName(diskStoreName); + } else { + cscThis.setOverflowDirectory(cscOther.getOverflowDirectory()); + } + } + + @Override + public synchronized void start() throws IOException { + Assert.assertTrue(this.cache != null); + boolean isSqlFabricSystem = ((GemFireCacheImpl)this.cache).isSqlfSystem(); + + this.serialNumber = createSerialNumber(); + if (DynamicRegionFactory.get().isOpen()) { + // force notifyBySubscription to be true so that meta info is pushed + // from servers to clients instead of invalidates. + if (!this.notifyBySubscription) { + logger.info(LocalizedMessage.create(LocalizedStrings.CacheServerImpl_FORCING_NOTIFYBYSUBSCRIPTION_TO_SUPPORT_DYNAMIC_REGIONS)); + this.notifyBySubscription = true; + } + } + this.advisor = CacheServerAdvisor.createCacheServerAdvisor(this); + this.loadMonitor = new LoadMonitor(loadProbe, maxConnections, + loadPollInterval, FORCE_LOAD_UPDATE_FREQUENCY, + advisor); + List overflowAttributesList = new LinkedList(); + ClientSubscriptionConfig csc = this.getClientSubscriptionConfig(); + overflowAttributesList.add(0, csc.getEvictionPolicy()); + overflowAttributesList.add(1, Integer.valueOf(csc.getCapacity())); + overflowAttributesList.add(2, Integer.valueOf(this.port)); + String diskStoreName = csc.getDiskStoreName(); + if (diskStoreName != null) { + overflowAttributesList.add(3, diskStoreName); + overflowAttributesList.add(4, true); // indicator to use diskstore + } else { + overflowAttributesList.add(3, csc.getOverflowDirectory()); + overflowAttributesList.add(4, false); + } + + this.acceptor = new AcceptorImpl(getPort(), + getBindAddress(), + getNotifyBySubscription(), + getSocketBufferSize(), + getMaximumTimeBetweenPings(), + this.cache, + getMaxConnections(), + getMaxThreads(), + getMaximumMessageCount(), + getMessageTimeToLive(), + getTransactionTimeToLive(), + this.loadMonitor, + overflowAttributesList, + isSqlFabricSystem, + this.isGatewayReceiver, + this.gatewayTransportFilters, this.tcpNoDelay); + + this.acceptor.start(); + this.advisor.handshake(); + this.loadMonitor.start(new ServerLocation(getExternalAddress(), + getPort()), acceptor.getStats()); + + // TODO : Need to provide facility to enable/disable client health monitoring. + //Creating ClientHealthMonitoring region. + // Force initialization on current cache + if(cache instanceof GemFireCacheImpl) { + ClientHealthMonitoringRegion.getInstance((GemFireCacheImpl)cache); + } + this.cache.getLoggerI18n().config(LocalizedStrings.CacheServerImpl_CACHESERVER_CONFIGURATION___0, getConfig()); + + /* + * If the stopped bridge server is restarted, we'll need to re-register the + * client membership listener. If the listener is already registered it + * won't be registered as would the case when start() is invoked for the + * first time. + */ + ClientMembershipListener[] membershipListeners = + ClientMembership.getClientMembershipListeners(); + + boolean membershipListenerRegistered = false; + for (ClientMembershipListener membershipListener : membershipListeners) { + //just checking by reference as the listener instance is final + if (listener == membershipListener) { + membershipListenerRegistered = true; + break; + } + } + + if (!membershipListenerRegistered) { + ClientMembership.registerClientMembershipListener(listener); + } + + if (!isGatewayReceiver) { + InternalDistributedSystem system = ((GemFireCacheImpl) this.cache) + .getDistributedSystem(); + system.handleResourceEvent(ResourceEvent.CACHE_SERVER_START, this); + } + + } + + + /** + * Gets the address that this bridge server can be contacted on from external + * processes. + * @since 5.7 + */ + public String getExternalAddress() { + return getExternalAddress(true); + } + + public String getExternalAddress(boolean checkServerRunning) { + if (checkServerRunning) { + if (!this.isRunning()) { + String s = "A bridge server's bind address is only available if it has been started"; + this.cache.getCancelCriterion().checkCancelInProgress(null); + throw new IllegalStateException(s); + } + } + if (this.hostnameForClients == null || this.hostnameForClients.equals("")) { + if (this.acceptor != null) { + return this.acceptor.getExternalAddress(); + } + else { + return null; + } + } + else { + return this.hostnameForClients; + } + } + + public boolean isRunning() { + return this.acceptor != null && this.acceptor.isRunning(); + } + + public synchronized void stop() { + if (!isRunning()) { + return; + } + + RuntimeException firstException = null; + + try { + if(this.loadMonitor != null) { + this.loadMonitor.stop(); + } + } catch(RuntimeException e) { + cache.getLoggerI18n().warning(LocalizedStrings.CacheServerImpl_CACHESERVER_ERROR_CLOSING_LOAD_MONITOR, e); + firstException = e; + } + + try { + if (this.advisor != null) { + this.advisor.close(); + } + } catch(RuntimeException e) { + cache.getLoggerI18n().warning(LocalizedStrings.CacheServerImpl_CACHESERVER_ERROR_CLOSING_ADVISOR, e); + firstException = e; + } + + try { + if (this.acceptor != null) { + this.acceptor.close(); + } + } catch(RuntimeException e) { + logger.warn(LocalizedMessage.create(LocalizedStrings.CacheServerImpl_CACHESERVER_ERROR_CLOSING_ACCEPTOR_MONITOR), e); + if (firstException != null) { + firstException = e; + } + } + + if(firstException != null) { + throw firstException; + } + + //TODO : We need to clean up the admin region created for client + //monitoring. + + // BridgeServer is still available, just not running, so we don't take + // it out of the cache's list... + // cache.removeBridgeServer(this); + + /* Assuming start won't be called after stop */ + ClientMembership.unregisterClientMembershipListener(listener); + + TXManagerImpl txMgr = (TXManagerImpl) cache.getCacheTransactionManager(); + txMgr.removeHostedTXStatesForClients(); + + if (!isGatewayReceiver) { + InternalDistributedSystem system = ((GemFireCacheImpl) this.cache) + .getDistributedSystem(); + system.handleResourceEvent(ResourceEvent.CACHE_SERVER_STOP, this); + } + + } + + private String getConfig() { + ClientSubscriptionConfig csc = this.getClientSubscriptionConfig(); + String str = + "port=" + getPort() + " max-connections=" + getMaxConnections() + + " max-threads=" + getMaxThreads() + " notify-by-subscription=" + + getNotifyBySubscription() + " socket-buffer-size=" + + getSocketBufferSize() + " maximum-time-between-pings=" + + getMaximumTimeBetweenPings() + " maximum-message-count=" + + getMaximumMessageCount() + " message-time-to-live=" + + getMessageTimeToLive() + " eviction-policy=" + csc.getEvictionPolicy() + + " capacity=" + csc.getCapacity() + " overflow directory="; + if (csc.getDiskStoreName() != null) { + str += csc.getDiskStoreName(); + } else { + str += csc.getOverflowDirectory(); + } + str += + " groups=" + Arrays.asList(getGroups()) + + " loadProbe=" + loadProbe + + " loadPollInterval=" + loadPollInterval + + " tcpNoDelay=" + tcpNoDelay; + return str; + } + + @Override + public String toString() { + ClientSubscriptionConfig csc = this.getClientSubscriptionConfig(); + String str = + "CacheServer on port=" + getPort() + " client subscription config policy=" + + csc.getEvictionPolicy() + " client subscription config capacity=" + + csc.getCapacity(); + if (csc.getDiskStoreName() != null) { + str += " client subscription config overflow disk store=" + + csc.getDiskStoreName(); + } else { + str += " client subscription config overflow directory=" + + csc.getOverflowDirectory(); + } + return str; + } + + /** + * Test method used to access the internal acceptor + * + * @return the internal acceptor + */ + public AcceptorImpl getAcceptor() { + return this.acceptor; + } + + // DistributionAdvisee methods + + public DM getDistributionManager() { + return getSystem().getDistributionManager(); + } + + public ClientSession getClientSession(String durableClientId) { + return getCacheClientNotifier().getClientProxy(durableClientId); + } + + public ClientSession getClientSession(DistributedMember member) { + return getCacheClientNotifier().getClientProxy( + ClientProxyMembershipID.getClientId(member)); + } + + public Set getAllClientSessions() { + return new HashSet(getCacheClientNotifier().getClientProxies()); + } + + /** + * create client subscription + * + * @param cache + * @param ePolicy + * @param capacity + * @param port + * @param overFlowDir + * @param isDiskStore + * @return client subscription name + * @since 5.7 + */ + public static String clientMessagesRegion(GemFireCacheImpl cache, String ePolicy, + int capacity, int port, String overFlowDir, boolean isDiskStore) { + AttributesFactory factory = getAttribFactoryForClientMessagesRegion(cache, + ePolicy, capacity, overFlowDir, isDiskStore); + RegionAttributes attr = factory.create(); + + return createClientMessagesRegion(attr, cache, capacity, port); + } + + public static AttributesFactory getAttribFactoryForClientMessagesRegion( + GemFireCacheImpl cache, + String ePolicy, int capacity, String overflowDir, boolean isDiskStore) + throws InvalidValueException, GemFireIOException { + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + + if (isDiskStore) { + // overflowDir parameter is actually diskstore name + factory.setDiskStoreName(overflowDir); + // client subscription queue is always overflow to disk, so do async + // see feature request #41479 + factory.setDiskSynchronous(true); + } else if (overflowDir == null || overflowDir.equals(ClientSubscriptionConfig.DEFAULT_OVERFLOW_DIRECTORY)) { + factory.setDiskStoreName(null); + // client subscription queue is always overflow to disk, so do async + // see feature request #41479 + factory.setDiskSynchronous(true); + } else { + File dir = new File(overflowDir + File.separatorChar + + generateNameForClientMsgsRegion(OSProcess.getId())); + // This will delete the overflow directory when virtual machine terminates. + dir.deleteOnExit(); + if (!dir.mkdirs() && !dir.isDirectory()) { + throw new GemFireIOException("Could not create client subscription overflow directory: " + + dir.getAbsolutePath()); + } + File[] dirs = { dir }; + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + DiskStore bsi = dsf.setAutoCompact(true) + .setDiskDirsAndSizes(dirs, new int[] { Integer.MAX_VALUE }) + .create("bsi"); + factory.setDiskStoreName("bsi"); + // backward compatibility, it was sync + factory.setDiskSynchronous(true); + } + factory.setDataPolicy(DataPolicy.NORMAL); + // enable statistics + factory.setStatisticsEnabled(true); + /* setting LIFO related eviction attributes */ + if (HARegionQueue.HA_EVICTION_POLICY_ENTRY.equals(ePolicy)) { + factory + .setEvictionAttributes(EvictionAttributesImpl.createLIFOEntryAttributes( + capacity, EvictionAction.OVERFLOW_TO_DISK)); + } + else if (HARegionQueue.HA_EVICTION_POLICY_MEMORY.equals(ePolicy)) { // condition refinement + factory + .setEvictionAttributes(EvictionAttributesImpl.createLIFOMemoryAttributes( + capacity, EvictionAction.OVERFLOW_TO_DISK)); + } + else { + // throw invalid eviction policy exception + throw new InvalidValueException( + LocalizedStrings.CacheServerImpl__0_INVALID_EVICTION_POLICY.toLocalizedString(ePolicy)); + } + return factory; + } + + public static String createClientMessagesRegion(RegionAttributes attr, + GemFireCacheImpl cache, int capacity, int port) { + // generating unique name in VM for ClientMessagesRegion + String regionName = generateNameForClientMsgsRegion(port); + try { + cache.createVMRegion(regionName, attr, + new InternalRegionArguments().setDestroyLockFlag(true) + .setRecreateFlag(false).setSnapshotInputStream(null) + .setImageTarget(null).setIsUsedForMetaRegion(true)); + } + catch (RegionExistsException ree) { + InternalGemFireError assErr = new InternalGemFireError( + "unexpected exception"); + assErr.initCause(ree); + throw assErr; + } + catch (IOException e) { + // only if loading snapshot, not here + InternalGemFireError assErr = new InternalGemFireError( + "unexpected exception"); + assErr.initCause(e); + throw assErr; + } + catch (ClassNotFoundException e) { + // only if loading snapshot, not here + InternalGemFireError assErr = new InternalGemFireError( + "unexpected exception"); + assErr.initCause(e); + throw assErr; + } + return regionName; + } + + public static String createClientMessagesRegionForTesting(GemFireCacheImpl cache, + String ePolicy, int capacity, int port, int expiryTime, String overFlowDir, boolean isDiskStore) { + AttributesFactory factory = getAttribFactoryForClientMessagesRegion(cache, + ePolicy, capacity, overFlowDir, isDiskStore); + ExpirationAttributes ea = new ExpirationAttributes(expiryTime, + ExpirationAction.LOCAL_INVALIDATE); + factory.setEntryTimeToLive(ea); + RegionAttributes attr = factory.create(); + + return createClientMessagesRegion(attr, cache, capacity, port); + } + + /** + * Generates the name for the client subscription using the given id. + * + * @param id + * @return String + * @since 5.7 + */ + public static String generateNameForClientMsgsRegion(int id) { + return ClientSubscriptionConfigImpl.CLIENT_SUBSCRIPTION + "_" + id; + } + + /* + * Marker class name to identify the lock more easily in thread dumps private + * static class ClientMessagesRegionLock extends Object { } + */ + public DistributionAdvisor getDistributionAdvisor() { + return this.advisor; + } + + /** + * Returns the BridgeServerAdvisor for this server + */ + public CacheServerAdvisor getCacheServerAdvisor() { + return this.advisor; + } + + public Profile getProfile() { + return getDistributionAdvisor().createProfile(); + } + + public DistributionAdvisee getParentAdvisee() { + return null; + } + + /** + * Returns the underlying InternalDistributedSystem connection. + * @return the underlying InternalDistributedSystem + */ + public InternalDistributedSystem getSystem() { + return (InternalDistributedSystem)this.cache.getDistributedSystem(); + } + + public String getName() { + return "CacheServer"; + } + + public String getFullPath() { + return getName(); + } + + private final static AtomicInteger profileSN = new AtomicInteger(); + + private static int createSerialNumber() { + return profileSN.incrementAndGet(); + } + + /** + * Returns an array of all the groups of this bridge server. + * This includes those from the groups gemfire property + * and those explicitly added to this server. + */ + public String[] getCombinedGroups() { + ArrayList groupList = new ArrayList(); + for (String g: MemberAttributes.parseGroups(null, getSystem().getConfig().getGroups())) { + if (!groupList.contains(g)) { + groupList.add(g); + } + } + for (String g: getGroups()) { + if (!groupList.contains(g)) { + groupList.add(g); + } + } + String[] groups = new String[groupList.size()]; + return groupList.toArray(groups); + } + + public /*synchronized causes deadlock*/ void fillInProfile(Profile profile) { + assert profile instanceof CacheServerProfile; + CacheServerProfile bp = (CacheServerProfile)profile; + bp.setHost(getExternalAddress(false)); + bp.setPort(getPort()); + bp.setGroups(getCombinedGroups()); + bp.setMaxConnections(maxConnections); + bp.setInitialLoad(loadMonitor.getLastLoad()); + bp.setLoadPollInterval(getLoadPollInterval()); + bp.serialNumber = getSerialNumber(); + bp.finishInit(); + } + + public int getSerialNumber() { + return this.serialNumber; + } + + + protected CacheClientNotifier getCacheClientNotifier() { + return getAcceptor().getCacheClientNotifier(); + } + + /** + * Registers a new InterestRegistrationListener with the set of + * InterestRegistrationListeners. + * + * @param listener + * The InterestRegistrationListener to register + * @throws IllegalStateException if the BridgeServer has not been started + * @since 5.8Beta + */ + public void registerInterestRegistrationListener( + InterestRegistrationListener listener) { + if (!this.isRunning()) { + throw new IllegalStateException(LocalizedStrings.CacheServerImpl_MUST_BE_RUNNING.toLocalizedString()); + } + getCacheClientNotifier().registerInterestRegistrationListener(listener); + } + + /** + * Unregisters an existing InterestRegistrationListener from + * the set of InterestRegistrationListeners. + * + * @param listener + * The InterestRegistrationListener to + * unregister + * + * @since 5.8Beta + */ + public void unregisterInterestRegistrationListener( + InterestRegistrationListener listener) { + getCacheClientNotifier().unregisterInterestRegistrationListener(listener); + } + + /** + * Returns a read-only set of InterestRegistrationListeners + * registered with this notifier. + * + * @return a read-only set of InterestRegistrationListeners + * registered with this notifier + * + * @since 5.8Beta + */ + public Set getInterestRegistrationListeners() { + return getCacheClientNotifier().getInterestRegistrationListeners(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerLauncher.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerLauncher.java index ac198e9,0000000..c5de84b mode 100755,000000..100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerLauncher.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerLauncher.java @@@ -1,1391 -1,0 +1,1393 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.gemstone.gemfire.internal.cache; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.PrintStream; +import java.io.Serializable; +import java.net.URL; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import com.gemstone.gemfire.SystemFailure; +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.partition.PartitionRegionHelper; +import com.gemstone.gemfire.cache.server.CacheServer; +import com.gemstone.gemfire.distributed.DistributedSystem; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.i18n.LogWriterI18n; +import com.gemstone.gemfire.internal.OSProcess; +import com.gemstone.gemfire.internal.PureJavaMode; +import com.gemstone.gemfire.internal.SocketCreator; ++import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; - import com.gemstone.gemfire.internal.logging.PureLogWriter; +import com.gemstone.gemfire.internal.process.StartupStatus; +import com.gemstone.gemfire.internal.process.StartupStatusListener; +import com.gemstone.gemfire.internal.util.IOUtils; +import com.gemstone.gemfire.internal.util.JavaCommandBuilder; + +/** + * Launcher program to start a cache server. + * + * @author Sudhir Menon + * @author Barry Oglesby + * @author David Whitlock + * @author John Blum + * + * @since 2.0.2 + */ +public class CacheServerLauncher { + + /** Is this VM a dedicated Cache Server? This value is used mainly by the admin API. */ + public static boolean isDedicatedCacheServer = Boolean.getBoolean("gemfire.isDedicatedServer"); + + public static boolean ASSIGN_BUCKETS = Boolean.getBoolean("gemfire.CacheServerLauncher.assignBucketsToPartitions"); + + //default is to exit if property not defined + public static boolean DONT_EXIT_AFTER_LAUNCH = Boolean.getBoolean("gemfire.CacheServerLauncher.dontExitAfterLaunch"); + + /** Should the launch command be printed? */ + public static final boolean PRINT_LAUNCH_COMMAND = Boolean.getBoolean( + CacheServerLauncher.class.getSimpleName() + ".PRINT_LAUNCH_COMMAND"); + + private static final long STATUS_WAIT_TIME + = Long.getLong("gemfire.CacheServerLauncher.STATUS_WAIT_TIME_MS", 15000); + + /** How long to wait for a cache server to stop */ + private static final long SHUTDOWN_WAIT_TIME + = Long.getLong("gemfire.CacheServerLauncher.SHUTDOWN_WAIT_TIME_MS", 20000); + + protected final String baseName; + protected final String defaultLogFileName; + protected final String startLogFileName; + protected final String statusName; + protected Status status = null; + protected File workingDir = null; + protected PrintStream oldOut = System.out; + protected PrintStream oldErr = System.err; + protected LogWriterI18n logger = null; + protected String maxHeapSize; + protected String initialHeapSize; + protected String offHeapSize; + + + public static final int SHUTDOWN = 0; + public static final int STARTING = 1; + public static final int RUNNING = 2; + public static final int SHUTDOWN_PENDING = 3; + + private static final int FORCE_STATUS_FILE_READ_ITERATION_COUNT = 10; + + public CacheServerLauncher(final String baseName) { + assert baseName != null : "The base name used for the cache server launcher files cannot be null!"; + this.baseName = baseName; + final String baseNameLowerCase = baseName.toLowerCase().replace(" ", ""); + this.startLogFileName = "start_" + baseNameLowerCase + ".log"; + this.defaultLogFileName = baseNameLowerCase + ".log"; + this.statusName = "." + baseNameLowerCase + ".ser"; + } + + protected static Status createStatus(final String baseName, final int state, final int pid) { + return createStatus(baseName, state, pid, null, null); + } + + protected static Status createStatus(final String baseName, final int state, final int pid, final String msg, final Throwable t) { + final Status status = new Status(baseName); + status.state = state; + status.pid = pid; + status.msg = msg; + status.exception = t; + return status; + } + + /** + * Prints usage information about this program. + */ + protected void usage() { + PrintStream out = System.out; + out.println("cacheserver start [-J]* [=]* [-dir=] [-classpath=] [-disable-default-server] [-rebalance] [-lock-memory] [-server-port=] [-server-bind-address=] [-critical-heap-percentage=] [-eviction-heap-percentage=] [-critical-off-heap-percentage=] [-eviction-off-heap-percentage=]\n" ); + out.println("\t" + LocalizedStrings.CacheServerLauncher_STARTS_A_GEMFIRE_CACHESERVER_VM.toLocalizedString() ); + out.println("\t" + LocalizedStrings.CacheServerLauncher_VMARG.toLocalizedString()); + out.println("\t" + LocalizedStrings.CacheServerLauncher_DIR.toLocalizedString()); + out.println("\t" + LocalizedStrings.CacheServerLauncher_CLASSPATH.toLocalizedString()); + out.println("\t" + LocalizedStrings.CacheServerLauncher_ATTNAME.toLocalizedString()); + out.println("\t" + LocalizedStrings.CacheServerLauncher_REBALANCE.toLocalizedString()); + out.println("\t" + LocalizedStrings.CacheServerLauncher_DISABLE_DEFAULT_SERVER.toLocalizedString()); + out.println("\t" + LocalizedStrings.CacheServerLauncher_SERVER_PORT.toLocalizedString()); + out.println("\t" + LocalizedStrings.CacheServerLauncher_SERVER_BIND_ADDRESS.toLocalizedString()); + out.println("\t" + LocalizedStrings.CacheServerLauncher_CRITICAL_HEAP_PERCENTAGE.toLocalizedString()); + out.println("\t" + LocalizedStrings.CacheServerLauncher_EVICTION_HEAP_PERCENTAGE.toLocalizedString()); + out.println("\t" + LocalizedStrings.CacheServerLauncher_CRITICAL_OFF_HEAP_PERCENTAGE.toLocalizedString()); + out.println("\t" + LocalizedStrings.CacheServerLauncher_EVICTION_OFF_HEAP_PERCENTAGE.toLocalizedString()); + out.println("\t" + LocalizedStrings.CacheServerLauncher_LOCK_MEMORY.toLocalizedString()); + + out.println(); + out.println( "cacheserver stop [-dir=]" ); + out.println("\t" + LocalizedStrings.CacheServerLauncher_STOPS_A_GEMFIRE_CACHESERVER_VM.toLocalizedString()); + out.println("\t" + LocalizedStrings.CacheServerLauncher_DIR.toLocalizedString()); + out.println(); + out.println( "cacheserver status [-dir=]" ); + out.println( "\t" + LocalizedStrings.CacheServerLauncher_STATUS.toLocalizedString()); + out.println("\t" + LocalizedStrings.CacheServerLauncher_DIR.toLocalizedString()); + } + + /** + * Prints the status of the cache server running the configured + * working directory. + */ + protected void status(final String[] args) throws Exception { + workingDir = (File) getStopOptions(args).get(DIR); + System.out.println(getStatus()); + System.exit(0); + } + + /** + * Returns the Status of the cache server in the + * workingDir. + */ + protected Status getStatus() throws Exception { + Status status; + + if (new File(workingDir, statusName).exists()) { + status = spinReadStatus(); // See bug 32456 + } + else { + // no pid since the cache server is not running + status = createStatus(this.baseName, SHUTDOWN, 0); + } + + return status; + } + + /** + * Main method that parses the command line and performs an + * will start, stop, or get the status of a cache server. This main + * method is also the main method of the launched cacher server VM + * ("server" mode). + */ + public static void main(final String[] args) { + final CacheServerLauncher launcher = new CacheServerLauncher("CacheServer"); + boolean inServer = false; + + try { + if (args.length > 0) { + if (args[0].equalsIgnoreCase("start")) { + launcher.start(args); + } + else if (args[0].equalsIgnoreCase("server")) { + inServer = true; + launcher.server(args); + } + else if (args[0].equalsIgnoreCase("stop")) { + launcher.stop(args); + } + else if (args[0].equalsIgnoreCase("status")) { + launcher.status(args); + } + else { + launcher.usage(); + System.exit(1); + } + } + else { + launcher.usage(); + System.exit(1); + } + + throw new Exception(LocalizedStrings.CacheServerLauncher_INTERNAL_ERROR_SHOULDNT_REACH_HERE.toLocalizedString()); + } + catch (VirtualMachineError err) { + SystemFailure.initiateFailure(err); + // If this ever returns, rethrow the error. We're poisoned + // now, so don't let this thread continue. + throw err; + } + catch (Throwable t) { + // Whenever you catch Error or Throwable, you must also + // catch VirtualMachineError (see above). However, there is + // _still_ a possibility that you are dealing with a cascading + // error condition, so you also need to check to see if the JVM + // is still usable: + SystemFailure.checkFailure(); + t.printStackTrace(); + if (inServer) { + launcher.setServerError(LocalizedStrings.CacheServerLauncher_ERROR_STARTING_SERVER_PROCESS + .toLocalizedString(), t); + } + launcher.restoreStdOut(); + if (launcher.logger != null) { + launcher.logger.severe(LocalizedStrings.CacheServerLauncher_CACHE_SERVER_ERROR, t); + + } + else { + System.out.println(LocalizedStrings.CacheServerLauncher_ERROR_0.toLocalizedString(t.getMessage())); + } + System.exit(1); + } + } + + protected void restoreStdOut( ) { + System.setErr( oldErr ); + System.setOut( oldOut ); + } + + protected static final String DIR = "dir"; + protected static final String VMARGS = "vmargs"; + protected static final String PROPERTIES = "properties"; + protected static final String CLASSPATH = "classpath"; + protected static final String REBALANCE = "rebalance"; + protected static final String SERVER_PORT = "server-port"; + protected static final String SERVER_BIND_ADDRESS = "server-bind-address"; + protected static final String DISABLE_DEFAULT_SERVER = "disable-default-server"; + public static final String CRITICAL_HEAP_PERCENTAGE = + "critical-heap-percentage"; + public static final String EVICTION_HEAP_PERCENTAGE = + "eviction-heap-percentage"; + public static final String CRITICAL_OFF_HEAP_PERCENTAGE = + "critical-off-heap-percentage"; + public static final String EVICTION_OFF_HEAP_PERCENTAGE = + "eviction-off-heap-percentage"; + protected static final String LOCK_MEMORY = "lock-memory"; + + protected final File processDirOption(final Map options, final String dirValue) throws FileNotFoundException { + final File inputWorkingDirectory = new File(dirValue); + + if (!inputWorkingDirectory.exists()) { + throw new FileNotFoundException(LocalizedStrings.CacheServerLauncher_THE_INPUT_WORKING_DIRECTORY_DOES_NOT_EXIST_0 + .toLocalizedString(dirValue)); + } + + options.put(DIR, inputWorkingDirectory); + + return inputWorkingDirectory; + } + + /** + * Populates a map that maps the name of the start options such as {@link #DIR} to its value on the command line. + * If no value is specified on the command line, a default one is provided. + */ + protected Map getStartOptions(String[] args) throws Exception { + final Map options = new HashMap(); + options.put(DIR, new File(System.getProperty("user.dir"))); + + final List vmArgs = new ArrayList(); + options.put(VMARGS, vmArgs); + + final Properties props = new Properties(); + options.put(PROPERTIES, props); + + for (final String arg : args) { + if (arg.equals("start")) { + // expected + } + else if (arg.startsWith("-classpath=")) { + options.put(CLASSPATH, arg.substring(arg.indexOf("=") + 1)); + } + else if (arg.startsWith("-dir=")) { + processDirOption(options, arg.substring(arg.indexOf("=") + 1)); + } + else if (arg.startsWith("-disable-default-server")) { + options.put(DISABLE_DEFAULT_SERVER, arg); + } + else if (arg.startsWith("-lock-memory")) { + if (System.getProperty("os.name").indexOf("Windows") >= 0) { + throw new IllegalArgumentException("Unable to lock memory on this operating system"); + } + props.put(DistributionConfig.LOCK_MEMORY_NAME, "true"); + } + else if (arg.startsWith("-rebalance")) { + options.put(REBALANCE, Boolean.TRUE); + } + else if (arg.startsWith("-server-port")) { + options.put(SERVER_PORT, arg); + } + else if (arg.startsWith("-" + CRITICAL_HEAP_PERCENTAGE) ) { + options.put(CRITICAL_HEAP_PERCENTAGE, arg); + } + else if (arg.startsWith("-" + EVICTION_HEAP_PERCENTAGE) ) { + options.put(EVICTION_HEAP_PERCENTAGE, arg); + } + else if (arg.startsWith("-" + CRITICAL_OFF_HEAP_PERCENTAGE) ) { + options.put(CRITICAL_OFF_HEAP_PERCENTAGE, arg); + } + else if (arg.startsWith("-" + EVICTION_OFF_HEAP_PERCENTAGE) ) { + options.put(EVICTION_OFF_HEAP_PERCENTAGE, arg); + } + else if (arg.startsWith("-server-bind-address")) { + options.put(SERVER_BIND_ADDRESS, arg); + } + else if (arg.startsWith("-J")) { + String vmArg = arg.substring(2); + if (vmArg.startsWith("-Xmx")) { + this.maxHeapSize = vmArg.substring(4); + } else if (vmArg.startsWith("-Xms")) { + this.initialHeapSize = vmArg.substring(4); + } + vmArgs.add(vmArg); + } + // moved this default block down so that "-J" like options can have '=' in them. + // an 'indexOf' the assignment operator with greater than 0 ensures a non-empty String key value + else if (arg.indexOf("=") > 0) { + final int assignmentIndex = arg.indexOf("="); + final String key = arg.substring(0, assignmentIndex); + final String value = arg.substring(assignmentIndex + 1); + + if (key.startsWith("-")) { + processStartOption(key.substring(1), value, options, vmArgs, props); + } + else { + processStartArg(key, value, options, vmArgs, props); + } + } + else { + throw new IllegalArgumentException(LocalizedStrings.CacheServerLauncher_UNKNOWN_ARGUMENT_0 + .toLocalizedString(arg)); + } + } + + // -J-Djava.awt.headless=true has been added for Mac platform where it + // causes an icon to appear for sqlf launched procs + // TODO: check which library/GemFire code causes awt to be touched + vmArgs.add("-Djava.awt.headless=true"); + + // configure commons-logging to use JDK logging + vmArgs.add("-Dorg.apache.commons.logging.Log=org.apache.commons.logging.impl.Jdk14Logger"); + + options.put(VMARGS, vmArgs); + return options; + } + + /** + * Process a command-line options of the form "key=value". + */ + protected void processStartArg(final String key, + final String value, + final Map options, + final List vmArgs, + final Properties props) + throws Exception + { + props.setProperty(key, value); + } + + /** + * Process a command-line option of the form "-key=value". + */ + protected void processStartOption(final String key, + final String value, + final Map options, + final List vmArgs, + final Properties props) + throws Exception + { + processUnknownStartOption(key, value, options, vmArgs, props); + } + + /** + * Process a command-line option of the form "-key=value" unknown to the base class. + */ + protected void processUnknownStartOption(final String key, + final String value, + final Map options, + final List vmArgs, + final Properties props) { + throw new IllegalArgumentException(LocalizedStrings.CacheServerLauncher_UNKNOWN_ARGUMENT_0.toLocalizedString(key)); + } + + /** + * Extracts configuration information used when launching the cache server VM. + */ + protected Map getServerOptions(final String[] args) throws Exception { + final Map options = new HashMap(); + options.put(DIR, new File(".")); + workingDir = (File) options.get(DIR); + + final Properties props = new Properties(); + options.put(PROPERTIES, props); + + for (final String arg : args) { + if (arg.equals("server")) { + // expected + } + else if (arg.startsWith("-dir=")) { + this.workingDir = processDirOption(options, arg.substring(arg.indexOf("=") + 1)); + } + else if (arg.startsWith("-rebalance")) { + options.put(REBALANCE, Boolean.TRUE); + } + else if (arg.startsWith("-disable-default-server")) { + options.put(DISABLE_DEFAULT_SERVER, Boolean.TRUE); + } + else if (arg.startsWith("-lock-memory")) { + props.put(DistributionConfig.LOCK_MEMORY_NAME, "true"); + } + else if (arg.startsWith("-server-port")) { + options.put(SERVER_PORT, arg.substring(arg.indexOf("=") + 1)); + } + else if (arg.startsWith("-server-bind-address")) { + options.put(SERVER_BIND_ADDRESS, arg.substring(arg.indexOf("=") + 1)); + } + else if (arg.startsWith("-" + CRITICAL_HEAP_PERCENTAGE)) { + options.put(CRITICAL_HEAP_PERCENTAGE, arg.substring(arg.indexOf("=") + 1)); + } + else if (arg.startsWith("-" + EVICTION_HEAP_PERCENTAGE)) { + options.put(EVICTION_HEAP_PERCENTAGE, arg.substring(arg.indexOf("=") + 1)); + } + else if (arg.startsWith("-" + CRITICAL_OFF_HEAP_PERCENTAGE)) { + options.put(CRITICAL_OFF_HEAP_PERCENTAGE, arg.substring(arg.indexOf("=") + 1)); + } + else if (arg.startsWith("-" + EVICTION_OFF_HEAP_PERCENTAGE)) { + options.put(EVICTION_OFF_HEAP_PERCENTAGE, arg.substring(arg.indexOf("=") + 1)); + } + else if (arg.indexOf("=") > 1) { + final int assignmentIndex = arg.indexOf("="); + final String key = arg.substring(0, assignmentIndex); + final String value = arg.substring(assignmentIndex + 1); + + if (key.startsWith("-")) { + options.put(key.substring(1), value); + } + else { + props.setProperty(key, value); + } + } + else { + throw new IllegalArgumentException(LocalizedStrings.CacheServerLauncher_UNKNOWN_ARGUMENT_0.toLocalizedString(arg)); + } + } + + return options; + } + + /** + * Extracts configuration information for stopping a cache server based on the contents of the command-line. + * This method can also be used with getting the status of a cache server. + */ + protected Map getStopOptions(final String[] args) throws Exception { + final Map options = new HashMap(); + options.put(DIR, new File(".")); + + for (final String arg : args) { + if (arg.equals("stop") || arg.equals("status")) { + // expected + } + else if (arg.startsWith("-dir=")) { + processDirOption(options, arg.substring(arg.indexOf("=") + 1)); + } + else { + throw new IllegalArgumentException(LocalizedStrings.CacheServerLauncher_UNKNOWN_ARGUMENT_0 + .toLocalizedString(arg)); + } + } + + return options; + } + + /** + * Configures and spawns a VM that hosts a cache server. Any output + * generated while starting the VM will be placed in a special + * {@linkplain #startLogFileName log file}. + * + * See #getStartOptions + * @see OSProcess#bgexec(String[], File, File, boolean, Map) + */ + public void start(final String[] args) throws Exception { + final Map options = getStartOptions(args); + + workingDir = (File) options.get(DIR); + + // Complain if a cache server is already running in the specified working directory. + // See bug 32574. + verifyAndClearStatus(); + + // start the GemFire Cache Server proces... + runCommandLine(options, buildCommandLine(options)); + + // wait for status.state == RUNNING + waitForRunning(); + + if (DONT_EXIT_AFTER_LAUNCH) { + return; + } + + System.exit(0); + } + + private void verifyAndClearStatus() throws Exception { + final Status status = getStatus(); + + if (status != null && status.state != SHUTDOWN) { + throw new IllegalStateException(LocalizedStrings.CacheServerLauncher_A_0_IS_ALREADY_RUNNING_IN_DIRECTORY_1_2 + .toLocalizedString(this.baseName, workingDir, status)); + } + + deleteStatus(); + } + + private String[] buildCommandLine(final Map options) { + final List commandLine = JavaCommandBuilder.buildCommand(this.getClass().getName(), + (String) options.get(CLASSPATH), null, (List) options.get(VMARGS)); + + commandLine.add("server"); + addToServerCommand(commandLine, options); + + return commandLine.toArray(new String[commandLine.size()]); + } + + private void printCommandLine(final String[] commandLine) { + if (PRINT_LAUNCH_COMMAND) { + System.out.println("Starting " + this.baseName + " with command:"); + for (final String command : commandLine) { + System.out.print(command); + System.out.print(" "); + } + System.out.println(); + } + } + + private int runCommandLine(final Map options, final String[] commandLine) throws Exception { + final File startLogFile = new File(workingDir, startLogFileName).getAbsoluteFile(); // see bug 32548 + + if (startLogFile.exists() && !startLogFile.delete()) { + throw new IOException("Unable to delete start log file (" + startLogFile.getAbsolutePath() + ")!"); + } + + Map env = new HashMap(); + // read the passwords from command line + SocketCreator.readSSLProperties(env); + + printCommandLine(commandLine); + + final int pid = OSProcess.bgexec(commandLine, workingDir, startLogFile, false, env); + + printStartMessage(options, pid); + + return pid; + } + + protected void printStartMessage(final Map options, final int pid) throws Exception { + System.out.println(LocalizedStrings.CacheServerLauncher_STARTING_0_WITH_PID_1.toLocalizedString(this.baseName, pid)); + } + + /** + * Sets the status of the cache server to be {@link #RUNNING}. + */ + public void running() { + try { + writeStatus(createStatus(this.baseName, RUNNING, OSProcess.getId())); + } + catch (Exception e) { + e.printStackTrace(); + } + } + + public static ThreadLocal serverPort = new ThreadLocal(); + + public static ThreadLocal serverBindAddress = new ThreadLocal(); + + public static Integer getServerPort() { + return serverPort.get(); + } + + public static String getServerBindAddress() { + return serverBindAddress.get(); + } + + public static ThreadLocal disableDefaultServer = new ThreadLocal(); + + public static Boolean getDisableDefaultServer() { + return disableDefaultServer.get(); + } + + /** + * The method that does the work of being a cache server. It is + * invoked in the VM spawned by the {@link #start} method. + * Basically, it creates a GemFire {@link Cache} based on + * configuration passed in from the command line. (It will also + * take gemfire.properties, etc. into account, just + * like an application cache.) + * + *

+ * + * After creating the cache and setting the server's status to {@link + * #RUNNING}, it periodically monitors the status, waiting for it to + * change to {@link #SHUTDOWN_PENDING} (see {@link #stop}). When + * the status does change, it closes the Cache and sets + * the status to be {@link #SHUTDOWN}. + * + * @param args Configuration options passed in from the command line + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD") + public void server(final String[] args) throws Exception { + isDedicatedCacheServer = true; + SystemFailure.setExitOK(true); + + final Map options = getServerOptions(args); + + final String serverPortString = (String) options.get(SERVER_PORT); + + if (serverPortString != null) { + serverPort.set(Integer.parseInt(serverPortString)); + } + + serverBindAddress.set((String) options.get(SERVER_BIND_ADDRESS)); + disableDefaultServer.set((Boolean) options.get(DISABLE_DEFAULT_SERVER)); + workingDir = new File(System.getProperty("user.dir")); + + // Say that we're starting... + Status originalStatus = createStatus(this.baseName, STARTING, OSProcess.getId()); + status = originalStatus; + writeStatus(status); + + // Connect to the distributed system. The properties will + // properly configure logging, the declarative caching file, etc. + final Properties props = (Properties) options.get(PROPERTIES); + + if (props.getProperty(DistributionConfig.LOG_FILE_NAME) == null && CacheServerLauncher.isLoggingToStdOut()) { + // Check First if the gemfire.properties set the log-file. If they do, we shouldn't override that default + final Properties gemfireProperties = new Properties(); + + DistributionConfigImpl.loadGemFireProperties(gemfireProperties); + + if (gemfireProperties.get(DistributionConfig.LOG_FILE_NAME) == null) { + // Do not allow the cache server to log to stdout, override the logger with #defaultLogFileName + props.setProperty(DistributionConfig.LOG_FILE_NAME, defaultLogFileName); + } + } + + InternalDistributedSystem system = this.connect(props); + + installLogListener(); + + logger = system.getLogWriter().convertToLogWriterI18n(); + // redirect output to the log file + OSProcess.redirectOutput(system.getConfig().getLogFile()); + + Cache cache = this.createCache(system, options); + cache.setIsServer(true); + startAdditionalServices(cache, options); + + this.running(); + + clearLogListener(); + + if (ASSIGN_BUCKETS) { + for (PartitionedRegion region : ((GemFireCacheImpl) cache).getPartitionedRegions()) { + PartitionRegionHelper.assignBucketsToPartitions(region); + } + } + + if (Boolean.TRUE.equals(options.get(REBALANCE))) { + cache.getResourceManager().createRebalanceFactory().start(); + } + + File statusFile = new File( workingDir, statusName ); + long lastModified=0, oldModified = statusFile.lastModified(); + // Every FORCE_STATUS_FILE_READ_ITERATION_COUNT iterations, read the status file despite the modification time + // to catch situations where the file is modified quicker than the file timestamp's resolution. + short count = 0; + boolean loggedWarning = false; + while(true) { + lastModified = statusFile.lastModified(); + if (lastModified > oldModified || count++ == FORCE_STATUS_FILE_READ_ITERATION_COUNT) { + count = 0; + Thread.sleep( 500 ); // allow for it to be finished writing. + //Sometimes the status file is partially written causing readObject to + //fail, sleep and retry. + try { + status = readStatus( ); + } catch(IOException ioeSecondChance) { + Thread.sleep(1000); + try { + status = readStatus( ); + } catch(IOException ioeThirdChance) { + Thread.sleep(5000); + try { + status = readStatus( ); + } catch (FileNotFoundException fnfe) { + // See bug 44627. + // The cache server used to just shutdown at this point. Instead, + // recreate the status file if possible and continue. + status = createStatus(this.baseName, RUNNING, originalStatus.pid); + try { + writeStatus(status); + } catch (FileNotFoundException e) { + if (!loggedWarning) { + logger.warning(LocalizedStrings.CacheServerLauncher_CREATE_STATUS_EXCEPTION_0, e.toString()); + loggedWarning = true; + } + } + } + } + } + oldModified = lastModified; + if (status.state == SHUTDOWN_PENDING) { + stopAdditionalServices(); + this.disconnect(cache); + status.state = SHUTDOWN; + writeStatus(status); + } else { + Thread.sleep( 250 ); + } + + } else { + Thread.sleep(1000); + } + if (!system.isConnected()) { +// System.out.println("System is disconnected. isReconnecting = " + system.isReconnecting()); + boolean reconnected = false; + if (system.isReconnecting()) { + reconnected = system.waitUntilReconnected(-1, TimeUnit.SECONDS); + if (reconnected) { + system = (InternalDistributedSystem)system.getReconnectedSystem(); + cache = GemFireCacheImpl.getInstance(); + } + } + if (!reconnected) { + // shutdown-all disconnected the DS + System.exit(0); + } + } + } + } + + private void installLogListener() { + MainLogReporter reporter = new MainLogReporter(this.status); + StartupStatus.setListener(reporter); + reporter.setDaemon(true); + reporter.start(); + } + + private void clearLogListener() { + MainLogReporter mainLogListener = (MainLogReporter) StartupStatus.getStartupListener(); + if(mainLogListener != null) { + mainLogListener.shutdown(); + StartupStatus.clearListener(); + } + } + + protected InternalDistributedSystem connect(Properties props) { + return (InternalDistributedSystem)DistributedSystem.connect(props); + } + + protected static float getCriticalHeapPercent(Map options) { + if (options != null) { + String criticalHeapThreshold = (String)options + .get(CRITICAL_HEAP_PERCENTAGE); + if (criticalHeapThreshold != null) { + return Float.parseFloat(criticalHeapThreshold + .substring(criticalHeapThreshold.indexOf("=") + 1)); + } + } + return -1.0f; + } + + protected static float getEvictionHeapPercent(Map options) { + if (options != null) { + String evictionHeapThreshold = (String)options + .get(EVICTION_HEAP_PERCENTAGE); + if (evictionHeapThreshold != null) { + return Float.parseFloat(evictionHeapThreshold + .substring(evictionHeapThreshold.indexOf("=") + 1)); + } + } + return -1.0f; + } + + protected static float getCriticalOffHeapPercent(Map options) { + if (options != null) { + String criticalOffHeapThreshold = (String)options + .get(CRITICAL_OFF_HEAP_PERCENTAGE); + if (criticalOffHeapThreshold != null) { + return Float.parseFloat(criticalOffHeapThreshold + .substring(criticalOffHeapThreshold.indexOf("=") + 1)); + } + } + return -1.0f; + } + + protected static float getEvictionOffHeapPercent(Map options) { + if (options != null) { + String evictionOffHeapThreshold = (String)options + .get(EVICTION_OFF_HEAP_PERCENTAGE); + if (evictionOffHeapThreshold != null) { + return Float.parseFloat(evictionOffHeapThreshold + .substring(evictionOffHeapThreshold.indexOf("=") + 1)); + } + } + return -1.0f; + } + + protected Cache createCache(InternalDistributedSystem system, Map options) throws IOException { + Cache cache = CacheFactory.create(system); + + float threshold = getCriticalHeapPercent(options); + if (threshold > 0.0f) { + cache.getResourceManager().setCriticalHeapPercentage(threshold); + } + threshold = getEvictionHeapPercent(options); + if (threshold > 0.0f) { + cache.getResourceManager().setEvictionHeapPercentage(threshold); + } + + threshold = getCriticalOffHeapPercent(options); + getCriticalOffHeapPercent(options); + if (threshold > 0.0f) { + cache.getResourceManager().setCriticalOffHeapPercentage(threshold); + } + threshold = getEvictionOffHeapPercent(options); + if (threshold > 0.0f) { + cache.getResourceManager().setEvictionOffHeapPercentage(threshold); + } + + + // Create and start a default cache server + // If (disableDefaultServer is not set or it is set but false) AND (the number of cacheservers is 0) + Boolean disable = disableDefaultServer.get(); + if ((disable == null || !disable) && cache.getCacheServers().size() == 0) { + // Create and add a cache server + CacheServer server = cache.addCacheServer(); ++ ++ CacheServerHelper.setIsDefaultServer(server); + + // Set its port if necessary + Integer serverPort = CacheServerLauncher.getServerPort(); + if (serverPort != null) { + server.setPort(serverPort); + } + + // Set its bind address if necessary + String serverBindAddress = getServerBindAddress(); + if (serverBindAddress != null) { + server.setBindAddress(serverBindAddress.trim()); + } + + // Start it + server.start(); + } + + return cache; + } + + protected void disconnect(Cache cache) { + DistributedSystem dsys = cache.getDistributedSystem(); + cache.close(); + dsys.disconnect(); + } + + /** + * Stops a cache server (which is running in a different VM) by setting its status to {@link #SHUTDOWN_PENDING}. + * Waits for the cache server to actually shut down. + */ + public void stop(final String[] args) throws Exception { + this.workingDir = (File) getStopOptions(args).get(DIR); + + // determine the current state of the Cache Server process... + final File statusFile = new File(this.workingDir, this.statusName); + int exitStatus = 1; + + if (statusFile.exists()) { + this.status = spinReadStatus(); + + // upon reading the status file, request the Cache Server to shutdown if it has not already... + if (this.status.state != SHUTDOWN) { + // copy server PID and not use own PID; see bug #39707 + this.status = createStatus(this.baseName, SHUTDOWN_PENDING, this.status.pid); + writeStatus(this.status); + } + + // poll the Cache Server for a response to our shutdown request (passes through if the Cache Server + // has already shutdown)... + pollCacheServerForShutdown(); + + // after polling, determine the status of the Cache Server one last time and determine how to exit... + if (this.status.state == SHUTDOWN) { + System.out.println(LocalizedStrings.CacheServerLauncher_0_STOPPED.toLocalizedString(this.baseName)); + deleteStatus(); + exitStatus = 0; + } + else { + System.out.println(LocalizedStrings.CacheServerLauncher_TIMEOUT_WAITING_FOR_0_TO_SHUTDOWN_STATUS_IS_1 + .toLocalizedString(this.baseName, this.status)); + } + } + else { + System.out.println(LocalizedStrings.CacheServerLauncher_THE_SPECIFIED_WORKING_DIRECTORY_0_CONTAINS_NO_STATUS_FILE + .toLocalizedString(this.workingDir)); + } + + if (DONT_EXIT_AFTER_LAUNCH) { + return; + } + + System.exit(exitStatus); + } + + private void pollCacheServerForShutdown() throws InterruptedException { + final int increment = 250; // unit is in milliseconds + int clock = 0; + + // wait for a total of 20000 milliseconds (or 20 seconds) + while (clock < SHUTDOWN_WAIT_TIME && status.state != SHUTDOWN) { + try { + status = readStatus(); + } + catch (IOException ignore) { + } + + try { + Thread.sleep(increment); + } + catch (InterruptedException ie) { + break; + } + + clock += increment; + } + } + + /** + * A class that represents the status of a cache server. Instances + * of this class are serialized to a {@linkplain #statusName file} + * on disk. + * + * @see #SHUTDOWN + * @see #STARTING + * @see #RUNNING + * @see #SHUTDOWN_PENDING + */ + static class Status implements Serializable { + + private static final long serialVersionUID = 190943081363646485L; + public int state = 0; + public int pid = 0; + + private final String baseName; + public Throwable exception; + public String msg; + public String dsMsg; + + public Status(String baseName) { + this.baseName = baseName; + } + + @Override + public String toString() { + final StringBuilder buffer = new StringBuilder(); + buffer.append(this.baseName).append(" pid: ").append(pid).append(" status: "); + switch (state) { + case SHUTDOWN: + buffer.append("stopped"); + break; + case STARTING: + buffer.append("starting"); + break; + case RUNNING: + buffer.append("running"); + break; + case SHUTDOWN_PENDING: + buffer.append("stopping"); + break; + default: + buffer.append("unknown"); + break; + } + if (exception != null) { + if (msg != null) { + buffer.append("\n").append(msg).append(" - "); + } + else { + buffer.append("\nException in ").append(this.baseName).append(" - "); + } + buffer.append(LocalizedStrings + .CacheServerLauncher_SEE_LOG_FILE_FOR_DETAILS.toLocalizedString()); + } + else if (this.dsMsg != null) { + buffer.append('\n').append(this.dsMsg); + } + return buffer.toString(); + } + } + + /** + * Notes that an error has occurred in the cache server and that it + * has shut down because of it. + */ + protected void setServerError(final String msg, final Throwable t) { + try { + writeStatus(createStatus(this.baseName, SHUTDOWN, OSProcess.getId(), msg, t)); + } + catch (Exception e) { + if (logger != null) { + logger.severe(e); + } + else { + e.printStackTrace(); + } + System.exit(1); + } + } + + /** + * Sets the status of a cache server by serializing a Status + * instance to a file in the server's working directory. + */ + public void writeStatus(final Status s) throws IOException { + FileOutputStream fileOutput = null; + ObjectOutputStream objectOutput = null; + + try { + fileOutput = new FileOutputStream(new File(workingDir, statusName)); + objectOutput = new ObjectOutputStream(fileOutput); + objectOutput.writeObject(s); + objectOutput.flush(); + } + finally { + IOUtils.close(objectOutput); + IOUtils.close(fileOutput); + } + } + + /** + * Reads a cache server's status. If the status file cannot be read because of I/O problems, it will try again. + */ + protected Status spinReadStatus() { + final long timeout = (System.currentTimeMillis() + 60000); + Status status = null; + + while (status == null && System.currentTimeMillis() < timeout) { + try { + status = readStatus(); + } + catch (Exception e) { + // try again - the status might have been read in the middle of it being written by the server resulting in + // an EOFException here + try { + Thread.sleep(500); + } + catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + status = null; + break; + } + } + } + + return status; + } + + /** + * Reads a cache server's status from a file in its working directory. + */ + protected Status readStatus() throws InterruptedException, IOException { + final File statusFile = new File(workingDir, statusName); + + FileInputStream fileInput = null; + ObjectInputStream objectInput = null; + + try { + fileInput = new FileInputStream(statusFile); + objectInput = new ObjectInputStream(fileInput); + + Status status = (Status) objectInput.readObject(); + + // See bug 32760 + // Note, only execute the conditional createStatus statement if we are in native mode; if we are in pure Java mode + // the the process ID identified in the Status object is assumed to exist! + if (!isExistingProcess(status.pid)) { + status = createStatus(this.baseName, SHUTDOWN, status.pid); + } + + return status; + } + catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + catch (FileNotFoundException e) { + Thread.sleep(500); + + if (statusFile.exists()) { + return readStatus(); + } + else { + throw e; + } + } + finally { + IOUtils.close(objectInput); + IOUtils.close(fileInput); + } + } + + /** + * Removes a cache server's status file + */ + private void deleteStatus() throws IOException { + final File statusFile = new File(workingDir, statusName); + + if (statusFile.exists() && !statusFile.delete()) { + throw new IOException("Could not delete status file (" + statusFile.getAbsolutePath() + ")!"); + } + } + + protected boolean isExistingProcess(final int pid) { + return (PureJavaMode.isPure() || (pid != 0 && OSProcess.exists(pid))); + } + + protected void waitForRunning() throws Exception { + Status status = spinReadStatus(); + String lastReadMessage = null; + String lastReportedMessage = null; + long lastReadTime = System.nanoTime(); + if ( status == null ) { + throw new Exception(LocalizedStrings.CacheServerLauncher_NO_AVAILABLE_STATUS.toLocalizedString()); + } else { + switch ( status.state ) { + case STARTING: + // re-read status for a while... + while( status.state == STARTING ) { + Thread.sleep( 500 ); // fix for bug 36998 + status = spinReadStatus(); + + //check to see if the status message has changed + if(status.dsMsg != null && !status.dsMsg.equals(lastReadMessage)) { + lastReadMessage = status.dsMsg; + lastReadTime = System.nanoTime(); + } + + //if the status message has not changed for 15 seconds, print + //out the message. + long elapsed = System.nanoTime() - lastReadTime; + if(TimeUnit.NANOSECONDS.toMillis(elapsed) > STATUS_WAIT_TIME + && lastReadMessage != null && + !lastReadMessage.equals(lastReportedMessage)) { + long elapsedSec = TimeUnit.NANOSECONDS.toSeconds(elapsed); + System.out.println(LocalizedStrings.CacheServerLauncher_LAUNCH_IN_PROGRESS_0 + .toLocalizedString(elapsedSec, status.dsMsg)); + lastReportedMessage = lastReadMessage; + } + } + if (status.state == SHUTDOWN) { + System.out.println(status); + System.exit(1); + } + break; + default: + break; + } + System.out.println( status ); + } + } + + /** + * Reads {@link DistributedSystem#PROPERTY_FILE} and determines if the + * {@link DistributionConfig#LOG_FILE_NAME} property is set to stdout + * @return true if the logging would go to stdout + */ + private static boolean isLoggingToStdOut() { + Properties gfprops = new Properties(); + URL url = DistributedSystem.getPropertyFileURL(); + if (url != null) { + try { + gfprops.load(url.openStream()); + } catch (IOException io) { + //throw new GemFireIOException("Failed reading " + url, io); + System.out.println("Failed reading " + url); + System.exit( 1 ); + } + final String logFile = gfprops.getProperty(DistributionConfig.LOG_FILE_NAME); + if ( logFile == null || logFile.length() == 0 ) { + return true; + } + } else { + //Didnt find a property file, assuming the default is to log to stdout + return true; + } + return false; + } + + /** + * Process information contained in the options map and add to the command + * line of the subprocess as needed. + */ + protected void addToServerCommand(final List commandLine, final Map options) { + final ListWrapper commandLineWrapper = new ListWrapper(commandLine); + + if (Boolean.TRUE.equals(options.get(REBALANCE))) { + commandLineWrapper.add("-rebalance"); + } + + commandLineWrapper.add((String) options.get(DISABLE_DEFAULT_SERVER)); + commandLineWrapper.add((String) options.get(SERVER_PORT)); + commandLineWrapper.add((String) options.get(SERVER_BIND_ADDRESS)); + + String criticalHeapThreshold = (String)options.get(CRITICAL_HEAP_PERCENTAGE); + if (criticalHeapThreshold != null) { + commandLineWrapper.add(criticalHeapThreshold); + } + String evictionHeapThreshold = (String)options + .get(EVICTION_HEAP_PERCENTAGE); + if (evictionHeapThreshold != null) { + commandLineWrapper.add(evictionHeapThreshold); + } + + String criticalOffHeapThreshold = (String)options.get(CRITICAL_OFF_HEAP_PERCENTAGE); + if (criticalOffHeapThreshold != null) { + commandLineWrapper.add(criticalOffHeapThreshold); + } + String evictionOffHeapThreshold = (String)options + .get(EVICTION_OFF_HEAP_PERCENTAGE); + if (evictionOffHeapThreshold != null) { + commandLineWrapper.add(evictionOffHeapThreshold); + } + + final Properties props = (Properties) options.get(PROPERTIES); + + for (final Object key : props.keySet()) { + commandLineWrapper.add(key + "=" + props.getProperty(key.toString())); + } + + if (props.getProperty(DistributionConfig.LOG_FILE_NAME) == null && CacheServerLauncher.isLoggingToStdOut()) { + // Do not allow the cache server to log to stdout; override the logger with #defaultLogFileName + commandLineWrapper.add(DistributionConfig.LOG_FILE_NAME + "=" + defaultLogFileName); + } + } + + /** + * This method is called immediately following cache creation in the spawned + * process, but prior to setting the RUNNING flag in the status file. So the + * spawning process will block until this method completes. + */ + protected void startAdditionalServices(final Cache cache, final Map options) throws Exception { + } + + /** + * This method is called prior to DistributedSytstem.disconnect(). Care should + * be taken not to take too long in this method or else + * #CacheServerLauncher.stop may timeout. + */ + protected void stopAdditionalServices() throws Exception { + } + + /** + * A List implementation that disallows null values. + * @param the Class type for the List elements. + */ + protected static class ListWrapper extends AbstractList { + + private static final ThreadLocal addResult = new ThreadLocal(); + + private final List list; + + public ListWrapper(final List list) { + assert list != null : "The List cannot be null!"; + this.list = list; + } + + @Override + public boolean add(final E e) { + final boolean localAddResult = super.add(e); + return (localAddResult && addResult.get()); + } + + @Override + public void add(final int index, final E element) { + if (element != null) { + list.add(index, element); + } + addResult.set(element != null); + } + + @Override + public E get(final int index) { + return this.list.get(index); + } + + @Override + public E remove(final int index) { + return list.remove(index); + } + + @Override + public E set(final int index, final E element) { + return (element != null ? list.set(index, element) : list.get(index)); + } + + @Override + public int size() { + return list.size(); + } + } + + private class MainLogReporter extends Thread implements StartupStatusListener { + private String lastLogMessage; + private final Status status; + boolean running = true; + + public MainLogReporter(Status status) { + this.status = status; + } + + public synchronized void shutdown() { + this.running = false; + this.status.dsMsg = null; + this.notifyAll(); + } + + + @Override + public void setStatus(String status) { + lastLogMessage = status; + } + + public synchronized void run() { + while(running) { + try { + wait(1000); + } catch (InterruptedException e) { + //this should not happen. + break; + } + if(running && lastLogMessage != status.dsMsg) { + status.dsMsg = lastLogMessage; + try { + writeStatus(status); + } catch (IOException e) { + //this could happen if there was a concurrent write to the file + //eg a stop. + continue; + } + } + } + } + } +}