Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 72882200BA5 for ; Wed, 14 Sep 2016 00:44:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 71236160AD8; Tue, 13 Sep 2016 22:44:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 28513160AD3 for ; Wed, 14 Sep 2016 00:44:13 +0200 (CEST) Received: (qmail 30837 invoked by uid 500); 13 Sep 2016 22:44:12 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 30824 invoked by uid 99); 13 Sep 2016 22:44:12 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 22:44:12 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id C47DDCEF41 for ; Tue, 13 Sep 2016 22:44:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.344 X-Spam-Level: X-Spam-Status: No, score=-4.344 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.124] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id wrz2GsOuag1i for ; Tue, 13 Sep 2016 22:44:04 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 063EC5FC62 for ; Tue, 13 Sep 2016 22:43:52 +0000 (UTC) Received: (qmail 28407 invoked by uid 99); 13 Sep 2016 22:43:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 22:43:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 85B0BE03E2; Tue, 13 Sep 2016 22:43:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hiteshkhamesra@apache.org To: commits@geode.incubator.apache.org Date: Tue, 13 Sep 2016 22:44:46 -0000 Message-Id: In-Reply-To: <69ace8383aae47d1b0c2dc7793e01b2a@git.apache.org> References: <69ace8383aae47d1b0c2dc7793e01b2a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [58/61] [abbrv] incubator-geode git commit: GEODE-37 change package name from com.gemstone.gemfire (for ./geode-wan/src/main/java/com/gemstone/gemfire)to org.apache.geode for(to ./geode-wan/src/main/java/org/apache/geode) archived-at: Tue, 13 Sep 2016 22:44:15 -0000 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/AbstractRemoteGatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/AbstractRemoteGatewaySender.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/AbstractRemoteGatewaySender.java new file mode 100644 index 0000000..163a611 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/AbstractRemoteGatewaySender.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.client.PoolManager; +import com.gemstone.gemfire.cache.client.internal.PoolImpl; +import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorRequest; +import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorResponse; +import com.gemstone.gemfire.cache.wan.GatewayReceiver; +import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer; +import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.PoolFactoryImpl; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.net.ConnectException; +import java.util.Iterator; +import java.util.StringTokenizer; + +public abstract class AbstractRemoteGatewaySender extends AbstractGatewaySender { + private static final Logger logger = LogService.getLogger(); + + public AbstractRemoteGatewaySender() { + + } + public AbstractRemoteGatewaySender(Cache cache, GatewaySenderAttributes attrs){ + super(cache, attrs); + } + + /** used to reduce warning logs in case remote locator is down (#47634) */ + protected int proxyFailureTries = 0; + + public synchronized void initProxy() { + // return if it is being used for WBCL or proxy is already created + if (this.remoteDSId == DEFAULT_DISTRIBUTED_SYSTEM_ID || this.proxy != null + && !this.proxy.isDestroyed()) { + return; + } + + int locatorCount = 0; + PoolFactoryImpl pf = (PoolFactoryImpl) PoolManager.createFactory(); + pf.setPRSingleHopEnabled(false); + if (this.locatorDiscoveryCallback != null) { + pf.setLocatorDiscoveryCallback(locatorDiscoveryCallback); + } + pf.setReadTimeout(this.socketReadTimeout); + pf.setIdleTimeout(connectionIdleTimeOut); + pf.setSocketBufferSize(socketBufferSize); + pf.setServerGroup(GatewayReceiver.RECEIVER_GROUP); + RemoteLocatorRequest request = new RemoteLocatorRequest(this.remoteDSId, pf + .getPoolAttributes().getServerGroup()); + String locators = ((GemFireCacheImpl) this.cache).getDistributedSystem() + .getConfig().getLocators(); + if (logger.isDebugEnabled()) { + logger.debug("Gateway Sender is attempting to configure pool with remote locator information"); + } + StringTokenizer locatorsOnThisVM = new StringTokenizer(locators, ","); + while (locatorsOnThisVM.hasMoreTokens()) { + String localLocator = locatorsOnThisVM.nextToken(); + DistributionLocatorId locatorID = new DistributionLocatorId(localLocator); + try { + RemoteLocatorResponse response = (RemoteLocatorResponse) new TcpClient() + .requestToServer(locatorID.getHost(), locatorID.getPort(), request, + WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT); + + if (response != null) { + if (response.getLocators() == null) { + if (logProxyFailure()) { + logger.warn(LocalizedMessage.create( + LocalizedStrings.AbstractGatewaySender_REMOTE_LOCATOR_FOR_REMOTE_SITE_0_IS_NOT_AVAILABLE_IN_LOCAL_LOCATOR_1, + new Object[] { remoteDSId, localLocator })); + } + continue; + } + if (logger.isDebugEnabled()) { + logger.debug("Received the remote site {} location information:", this.remoteDSId, response.getLocators()); + } + StringBuffer strBuffer = new StringBuffer(); + Iterator itr = response.getLocators().iterator(); + while (itr.hasNext()) { + DistributionLocatorId locatorId = new DistributionLocatorId(itr.next()); + pf.addLocator(locatorId.getHost().getHostName(), locatorId.getPort()); + locatorCount++; + } + break; + } + } catch (IOException ioe) { + if (logProxyFailure()) { + // don't print stack trace for connection failures + String ioeStr = ""; + if (!logger.isDebugEnabled() && ioe instanceof ConnectException) { + ioeStr = ": " + ioe.toString(); + ioe = null; + } + logger.warn(LocalizedMessage.create( + LocalizedStrings.AbstractGatewaySender_SENDER_0_IS_NOT_ABLE_TO_CONNECT_TO_LOCAL_LOCATOR_1, + new Object[] { this.id, localLocator + ioeStr }), ioe); + } + continue; + } catch (ClassNotFoundException e) { + if (logProxyFailure()) { + logger.warn(LocalizedMessage.create( + LocalizedStrings.AbstractGatewaySender_SENDER_0_IS_NOT_ABLE_TO_CONNECT_TO_LOCAL_LOCATOR_1, + new Object[] { this.id, localLocator }), e); + } + continue; + } + } + + if (locatorCount == 0) { + if (logProxyFailure()) { + logger.fatal(LocalizedMessage.create( + LocalizedStrings.AbstractGatewaySender_SENDER_0_COULD_NOT_GET_REMOTE_LOCATOR_INFORMATION_FOR_SITE_1, + new Object[] { this.id, this.remoteDSId })); + } + this.proxyFailureTries++; + throw new GatewaySenderConfigurationException( + LocalizedStrings.AbstractGatewaySender_SENDER_0_COULD_NOT_GET_REMOTE_LOCATOR_INFORMATION_FOR_SITE_1 + .toLocalizedString(new Object[] { this.id, this.remoteDSId})); + } + pf.init(this); + this.proxy = ((PoolImpl) pf.create(this.getId())); + if (this.proxyFailureTries > 0) { + logger.info(LocalizedMessage.create(LocalizedStrings.AbstractGatewaySender_SENDER_0_GOT_REMOTE_LOCATOR_INFORMATION_FOR_SITE_1, + new Object[] { this.id, this.remoteDSId, this.proxyFailureTries })); + this.proxyFailureTries = 0; + } + } + + protected boolean logProxyFailure() { + assert Thread.holdsLock(this); + // always log the first failure + if (logger.isDebugEnabled() || this.proxyFailureTries == 0) { + return true; + } else { + // subsequent failures will be logged on 30th, 300th, 3000th try + // each try is at 100millis from higher layer so this accounts for logging + // after 3s, 30s and then every 5mins + if (this.proxyFailureTries >= 3000) { + return (this.proxyFailureTries % 3000) == 0; + } else { + return (this.proxyFailureTries == 30 || this.proxyFailureTries == 300); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java new file mode 100644 index 0000000..c5c2a2f --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.wan.GatewayReceiver; +import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory; +import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.distributed.internal.ResourceEvent; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation; +import com.gemstone.gemfire.internal.cache.xmlcache.GatewayReceiverCreation; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; + +/** + * + * @since GemFire 7.0 + */ +public class GatewayReceiverFactoryImpl implements GatewayReceiverFactory { + + private int startPort = GatewayReceiver.DEFAULT_START_PORT; + + private int endPort = GatewayReceiver.DEFAULT_END_PORT; + + private int timeBetPings = GatewayReceiver.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS; + + private int socketBuffSize = GatewayReceiver.DEFAULT_SOCKET_BUFFER_SIZE; + + private String bindAdd= GatewayReceiver.DEFAULT_BIND_ADDRESS; + + private String hostnameForSenders = GatewayReceiver.DEFAULT_HOSTNAME_FOR_SENDERS; + + private boolean manualStart = GatewayReceiver.DEFAULT_MANUAL_START; + + private List filters = new ArrayList(); + + private Cache cache; + + public GatewayReceiverFactoryImpl() { + + } + public GatewayReceiverFactoryImpl(Cache cache) { + this.cache = cache; + } + + public GatewayReceiverFactory addGatewayTransportFilter( + GatewayTransportFilter filter) { + this.filters.add(filter); + return this; + } + + public GatewayReceiverFactory removeGatewayTransportFilter( + GatewayTransportFilter filter) { + this.filters.remove(filter); + return this; + } + + public GatewayReceiverFactory setMaximumTimeBetweenPings(int time) { + this.timeBetPings = time; + return this; + } + + public GatewayReceiverFactory setStartPort(int port) { + this.startPort = port; + return this; + } + + public GatewayReceiverFactory setEndPort(int port) { + this.endPort = port; + return this; + } + + public GatewayReceiverFactory setSocketBufferSize(int size) { + this.socketBuffSize = size; + return this; + } + + public GatewayReceiverFactory setBindAddress(String address) { + this.bindAdd = address; + return this; + } + + public GatewayReceiverFactory setHostnameForSenders(String address) { + this.hostnameForSenders = address; + return this; + } + + public GatewayReceiverFactory setManualStart(boolean start) { + this.manualStart = start; + return this; + } + + public GatewayReceiver create() { + if (this.startPort > this.endPort) { + throw new IllegalStateException( + "Please specify either start port a value which is less than end port."); + } + GatewayReceiver recv = null; + if (this.cache instanceof GemFireCacheImpl) { + recv = new GatewayReceiverImpl(this.cache, this.startPort, this.endPort, + this.timeBetPings, this.socketBuffSize, this.bindAdd, this.filters, + this.hostnameForSenders, this.manualStart); + ((GemFireCacheImpl)cache).addGatewayReceiver(recv); + InternalDistributedSystem system = (InternalDistributedSystem) this.cache + .getDistributedSystem(); + system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_CREATE, recv); + if (!this.manualStart) { + try { + recv.start(); + } + catch (IOException ioe) { + throw new GatewayReceiverException( + LocalizedStrings.GatewayReceiver_EXCEPTION_WHILE_STARTING_GATEWAY_RECEIVER + .toLocalizedString(), ioe); + } + } + } else if (this.cache instanceof CacheCreation) { + recv = new GatewayReceiverCreation(this.cache, this.startPort, this.endPort, + this.timeBetPings, this.socketBuffSize, this.bindAdd, this.filters, + this.hostnameForSenders, this.manualStart); + ((CacheCreation)cache).addGatewayReceiver(recv); + } + return recv; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java new file mode 100644 index 0000000..b8768d4 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan; + +import java.io.IOException; +import java.net.BindException; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.List; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.server.CacheServer; +import com.gemstone.gemfire.cache.wan.GatewayReceiver; +import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.distributed.internal.ResourceEvent; +import com.gemstone.gemfire.internal.AvailablePort; +import com.gemstone.gemfire.internal.net.SocketCreator; +import com.gemstone.gemfire.internal.cache.CacheServerImpl; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; + +/** + * @since GemFire 7.0 + */ +@SuppressWarnings("deprecation") +public class GatewayReceiverImpl implements GatewayReceiver { + + private static final Logger logger = LogService.getLogger(); + + private String host; + + private int startPort; + + private int endPort; + + private int port; + + private int timeBetPings; + + private int socketBufferSize; + + private boolean manualStart; + + private final List filters; + + private String bindAdd; + + private CacheServer receiver; + + private final GemFireCacheImpl cache; + + public GatewayReceiverImpl(Cache cache, int startPort, + int endPort, int timeBetPings, int buffSize, String bindAdd, + List filters, String hostnameForSenders, boolean manualStart) { + this.cache = (GemFireCacheImpl)cache; + + /* + * If user has set hostNameForSenders then it should take precedence over + * bindAddress. If user hasn't set either hostNameForSenders or bindAddress + * then getLocalHost().getHostName() should be used. + */ + if (hostnameForSenders == null || hostnameForSenders.isEmpty()) { + if (bindAdd == null || bindAdd.isEmpty()) { + try { + logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiverImpl_USING_LOCAL_HOST)); + this.host = SocketCreator.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + throw new IllegalStateException( + LocalizedStrings.GatewayReceiverImpl_COULD_NOT_GET_HOST_NAME + .toLocalizedString(), + e); + } + } else { + this.host = bindAdd; + } + } else { + this.host = hostnameForSenders; + } + + this.startPort = startPort; + this.endPort = endPort; + this.timeBetPings = timeBetPings; + this.socketBufferSize = buffSize; + this.bindAdd = bindAdd; + this.filters = filters; + this.manualStart = manualStart; + } + + public List getGatewayTransportFilters() { + return this.filters; + } + + public int getMaximumTimeBetweenPings() { + return this.timeBetPings; + } + + public int getPort() { + return this.port; + } + + public int getStartPort() { + return this.startPort; + } + + public int getEndPort() { + return this.endPort; + } + + public int getSocketBufferSize() { + return this.socketBufferSize; + } + + public boolean isManualStart() { + return this.manualStart; + } + + public CacheServer getServer() { + return receiver; + } + + public void start() throws IOException { + if (receiver == null) { + receiver = this.cache.addCacheServer(true); + } + if (receiver.isRunning()) { + logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_IS_ALREADY_RUNNING)); + return; + } + boolean started = false; + this.port = getPortToStart(); + while (!started && this.port != -1) { + receiver.setPort(this.port); + receiver.setSocketBufferSize(socketBufferSize); + receiver.setMaximumTimeBetweenPings(timeBetPings); + receiver.setHostnameForClients(host); + receiver.setBindAddress(bindAdd); + receiver.setGroups(new String[] { GatewayReceiverImpl.RECEIVER_GROUP }); + ((CacheServerImpl)receiver).setGatewayTransportFilter(this.filters); + try { + ((CacheServerImpl)receiver).start(); + started = true; + } catch (BindException be) { + if (be.getCause() != null + && be.getCause().getMessage() + .contains("assign requested address")) { + throw new GatewayReceiverException( + LocalizedStrings.SocketCreator_FAILED_TO_CREATE_SERVER_SOCKET_ON_0_1 + .toLocalizedString(new Object[] { bindAdd, + Integer.valueOf(this.port) })); + } + // ignore as this port might have been used by other threads. + logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, this.port)); + this.port = getPortToStart(); + } catch (SocketException se) { + if (se.getMessage().contains("Address already in use")) { + logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, this.port)); + this.port = getPortToStart(); + + } else { + throw se; + } + } + + } + if (!started) { + throw new IllegalStateException( + "No available free port found in the given range."); + } + logger.info(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_STARTED_ON_PORT, this.port)); + + InternalDistributedSystem system = this.cache.getDistributedSystem(); + system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_START, this); + + } + + private int getPortToStart(){ + // choose a random port from the given port range + int rPort; + if (this.startPort == this.endPort) { + rPort = this.startPort; + } else { + rPort = AvailablePort.getRandomAvailablePortInRange(this.startPort, + this.endPort, AvailablePort.SOCKET); + } + return rPort; + } + + public void stop() { + if(!isRunning()){ + throw new GatewayReceiverException(LocalizedStrings.GatewayReceiver_IS_NOT_RUNNING.toLocalizedString()); + } + receiver.stop(); + +// InternalDistributedSystem system = ((GemFireCacheImpl) this.cache) +// .getDistributedSystem(); +// system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_STOP, this); + + } + + public String getHost() { + return this.host; + } + + public String getBindAddress() { + return this.bindAdd; + } + + public boolean isRunning() { + if (this.receiver != null) { + return this.receiver.isRunning(); + } + return false; + } + + public String toString() { + return new StringBuffer() + .append("Gateway Receiver") + .append("@").append(Integer.toHexString(hashCode())) + .append(" [") + .append("host='").append(getHost()) + .append("'; port=").append(getPort()) + .append("; bindAddress=").append(getBindAddress()) + .append("; maximumTimeBetweenPings=").append(getMaximumTimeBetweenPings()) + .append("; socketBufferSize=").append(getSocketBufferSize()) + .append("; isManualStart=").append(isManualStart()) + .append("; group=").append(Arrays.toString(new String[]{GatewayReceiverImpl.RECEIVER_GROUP})) + .append("]") + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java new file mode 100644 index 0000000..d2302c4 --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java @@ -0,0 +1,802 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan; + + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import com.gemstone.gemfire.GemFireIOException; +import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException; +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.CancelException; +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.RegionDestroyedException; +import com.gemstone.gemfire.cache.client.ServerConnectivityException; +import com.gemstone.gemfire.cache.client.ServerOperationException; +import com.gemstone.gemfire.cache.client.internal.Connection; +import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.distributed.internal.ServerLocation; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; +import com.gemstone.gemfire.pdx.PdxRegistryMismatchException; +import com.gemstone.gemfire.security.GemFireSecurityException; +import com.gemstone.gemfire.cache.client.internal.SenderProxy; + +/** + * @since GemFire 7.0 + * + */ +public class GatewaySenderEventRemoteDispatcher implements + GatewaySenderEventDispatcher { + + private static final Logger logger = LogService.getLogger(); + + private final AbstractGatewaySenderEventProcessor processor; + + private volatile Connection connection; + + private final Set notFoundRegions = new HashSet(); + + private final Object notFoundRegionsSync = new Object(); + + private final AbstractGatewaySender sender; + + private AckReaderThread ackReaderThread; + + private ReentrantReadWriteLock connectionLifeCycleLock = new ReentrantReadWriteLock(); + + /** + * This count is reset to 0 each time a successful connection is made. + */ + private int failedConnectCount = 0; + + public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor eventProcessor) { + this.processor = eventProcessor; + this.sender = eventProcessor.getSender(); +// this.ackReaderThread = new AckReaderThread(sender); + try { + initializeConnection(); + } + catch (GatewaySenderException e) { + if (e.getCause() instanceof GemFireSecurityException) { + throw e; + } + } + } + + protected GatewayAck readAcknowledgement() { + SenderProxy sp = new SenderProxy(this.processor.getSender().getProxy()); + GatewayAck ack = null; + Exception ex; + try { + connection = getConnection(false); + if (logger.isDebugEnabled()) { + logger.debug(" Receiving ack on the thread {}", connection); + } + this.connectionLifeCycleLock.readLock().lock(); + try { + if (connection != null) { + ack = (GatewayAck)sp.receiveAckFromReceiver(connection); + } + } finally { + this.connectionLifeCycleLock.readLock().unlock(); + } + + } catch (Exception e) { + Throwable t = e.getCause(); + if (t instanceof BatchException70) { + // A BatchException has occurred. + // Do not process the connection as dead since it is not dead. + ex = (BatchException70)t; + } else if (e instanceof GatewaySenderException) { //This Exception is thrown from getConnection + ex = (Exception) e.getCause(); + }else { + ex = e; + // keep using the connection if we had a batch exception. Else, destroy + // it + destroyConnection(); + } + if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) { + // if our pool is shutdown then just be silent + } else if (ex instanceof IOException + || (ex instanceof ServerConnectivityException && !(ex.getCause() instanceof PdxRegistryMismatchException)) + || ex instanceof ConnectionDestroyedException) { + // If the cause is an IOException or a ServerException, sleep and retry. + // Sleep for a bit and recheck. + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } else { + if (!(ex instanceof CancelException)) { + logger.fatal(LocalizedMessage.create( + LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH), + ex); + } + this.processor.setIsStopped(true); + } + } + return ack; + } + + @Override + public boolean dispatchBatch(List events, boolean isRetry) { + GatewaySenderStats statistics = this.sender.getStatistics(); + boolean success = false; + try { + long start = statistics.startTime(); + success =_dispatchBatch(events, isRetry); + if (success) { + statistics.endBatch(start, events.size()); + } + } catch (GatewaySenderException ge) { + + Throwable t = ge.getCause(); + if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) { + // if our pool is shutdown then just be silent + } else if (t instanceof IOException + || t instanceof ServerConnectivityException + || t instanceof ConnectionDestroyedException + || t instanceof MessageTooLargeException + || t instanceof IllegalStateException) { + this.processor.handleException(); + // If the cause is an IOException or a ServerException, sleep and retry. + // Sleep for a bit and recheck. + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + if (logger.isDebugEnabled()) { + logger.debug("Because of IOException, failed to dispatch a batch with id : {}", this.processor.getBatchId()); + } + } + else { + logger.fatal(LocalizedMessage.create( + LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH), ge); + this.processor.setIsStopped(true); + } + } + catch (CancelException e) { + if (logger.isDebugEnabled()) { + logger.debug("Stopping the processor because cancellation occurred while processing a batch"); + } + this.processor.setIsStopped(true); + throw e; + } catch (Exception e) { + this.processor.setIsStopped(true); + logger.fatal(LocalizedMessage.create( + LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH), + e); + } + return success; + } + + private boolean _dispatchBatch(List events, boolean isRetry) { + Exception ex = null; + int currentBatchId = this.processor.getBatchId(); + connection = getConnection(true); + int batchIdForThisConnection = this.processor.getBatchId(); + GatewaySenderStats statistics = this.sender.getStatistics(); + // This means we are writing to a new connection than the previous batch. + // i.e The connection has been reset. It also resets the batchId. + if (currentBatchId != batchIdForThisConnection + || this.processor.isConnectionReset()) { + return false; + } + try { + if (this.processor.isConnectionReset()) { + isRetry = true; + } + SenderProxy sp = new SenderProxy(this.sender.getProxy()); + this.connectionLifeCycleLock.readLock().lock(); + try { + if (connection != null) { + sp.dispatchBatch_NewWAN(connection, events, currentBatchId, isRetry); + if (logger.isDebugEnabled()) { + logger.debug("{} : Dispatched batch (id={}) of {} events, queue size: {} on connection {}", + this.processor.getSender(), currentBatchId, events.size(), this.processor.getQueue().size(), connection); + } + } else { + throw new ConnectionDestroyedException(); + } + } + finally{ + this.connectionLifeCycleLock.readLock().unlock(); + } + return true; + } + catch (ServerOperationException e) { + Throwable t = e.getCause(); + if (t instanceof BatchException70) { + // A BatchException has occurred. + // Do not process the connection as dead since it is not dead. + ex = (BatchException70)t; + } + else { + ex = e; + // keep using the connection if we had a batch exception. Else, destroy it + destroyConnection(); + } + throw new GatewaySenderException( + LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString( + new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex); + } + catch (GemFireIOException e) { + Throwable t = e.getCause(); + if (t instanceof MessageTooLargeException) { + // A MessageTooLargeException has occurred. + // Do not process the connection as dead since it is not dead. + ex = (MessageTooLargeException)t; + // Reduce the batch size by half of the configured batch size or number of events in the current batch (whichever is less) + int newBatchSize = Math.min(events.size(), this.processor.getBatchSize())/2; + logger.warn(LocalizedMessage.create( + LocalizedStrings.GatewaySenderEventRemoteDispatcher_MESSAGE_TOO_LARGE_EXCEPTION, new Object[] { events.size(), newBatchSize }), e); + this.processor.setBatchSize(newBatchSize); + statistics.incBatchesResized(); + } + else { + ex = e; + // keep using the connection if we had a MessageTooLargeException. Else, destroy it + destroyConnection(); + } + throw new GatewaySenderException( + LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString( + new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex); + } + catch (IllegalStateException e) { + this.processor.setException(new GatewaySenderException(e)); + throw new GatewaySenderException( + LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString( + new Object[] {this, Integer.valueOf(currentBatchId), connection}), e); + } + catch (Exception e) { + // An Exception has occurred. Get its cause. + Throwable t = e.getCause(); + if (t instanceof IOException) { + // An IOException has occurred. + ex = (IOException)t; + } else { + ex = e; + } + //the cause is not going to be BatchException70. So, destroy the connection + destroyConnection(); + + throw new GatewaySenderException( + LocalizedStrings.GatewayEventRemoteDispatcher_0_EXCEPTION_DURING_PROCESSING_BATCH_1_ON_CONNECTION_2.toLocalizedString( + new Object[] {this, Integer.valueOf(currentBatchId), connection}), ex); + } + } + + /** + * Acquires or adds a new Connection to the corresponding + * Gateway + * + * @return the Connection + * + * @throws GatewaySenderException + */ + public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException{ + if (this.processor.isStopped()) { + return null; + } + // IF the connection is null + // OR the connection's ServerLocation doesn't match with the one stored in sender + // THEN initialize the connection + if(!this.sender.isParallel()) { + if (this.connection == null || this.connection.isDestroyed() + || !this.connection.getServer().equals(this.sender.getServerLocation())) { + if (logger.isDebugEnabled()) { + logger.debug("Initializing new connection as serverLocation of old connection is : {} and the serverLocation to connect is {}", + ((this.connection == null) ? "null" : this.connection.getServer()), + this.sender.getServerLocation()); + } + // Initialize the connection + initializeConnection(); + } + } else { + if (this.connection == null || this.connection.isDestroyed()) { + initializeConnection(); + } + } + + // Here we might wait on a connection to another server if I was secondary + // so don't start waiting until I am primary + Cache cache = this.sender.getCache(); + if (cache != null && !cache.isClosed()) { + if (this.sender.isPrimary() && (this.connection != null)) { + if (this.ackReaderThread == null || !this.ackReaderThread.isRunning()) { + this.ackReaderThread = new AckReaderThread(this.sender, this.processor); + this.ackReaderThread.start(); + this.ackReaderThread.waitForRunningAckReaderThreadRunningState(); + } + } + } + return this.connection; + } + + public void destroyConnection() { + this.connectionLifeCycleLock.writeLock().lock(); + try { + Connection con = this.connection; + if (con != null) { + if (!con.isDestroyed()) { + con.destroy(); + this.sender.getProxy().returnConnection(con); + } + + // Reset the connection so the next time through a new one will be + // obtained + this.connection = null; + this.sender.setServerLocation(null); + } + } + finally { + this.connectionLifeCycleLock.writeLock().unlock(); + } + } + + /** + * Initializes the Connection. + * + * @throws GatewaySenderException + */ + private void initializeConnection() throws GatewaySenderException, + GemFireSecurityException { + this.connectionLifeCycleLock.writeLock().lock(); + try { + // Attempt to acquire a connection + if (this.sender.getProxy() == null + || this.sender.getProxy().isDestroyed()) { + this.sender.initProxy(); + } else { + this.processor.resetBatchId(); + } + Connection con; + try { + if (this.sender.isParallel()) { + /* + * TODO - The use of acquireConnection should be removed + * from the gateway code. This method is fine for tests, + * but these connections should really be managed inside + * the pool code. If the gateway needs to persistent connection + * to a single server, which should create have the OpExecutor + * that holds a reference to the connection (similar to the way + * we do with thread local connections). + * Use {@link ExecutablePool#setupServerAffinity(boolean)} for + * gateway code + */ + con = this.sender.getProxy().acquireConnection(); + // For parallel sender, setting server location will not matter. + // everytime it will ask for acquire connection whenever it needs it. I + // am saving this server location for command purpose + sender.setServerLocation(con.getServer()); + } else { + synchronized (this.sender + .getLockForConcurrentDispatcher()) { + ServerLocation server = this.sender.getServerLocation(); + if (server != null) { + if (logger.isDebugEnabled()) { + logger.debug("ServerLocation is: {}. Connecting to this serverLocation...", server); + } + con = this.sender.getProxy().acquireConnection(server); + } else { + if (logger.isDebugEnabled()) { + logger.debug("ServerLocation is null. Creating new connection. "); + } + con = this.sender.getProxy().acquireConnection(); + // Acquired connection from pool!! Update the server location + // information in the sender and + // distribute the information to other senders ONLY IF THIS SENDER + // IS + // PRIMARY + if (this.sender.isPrimary()) { + if (sender.getServerLocation() == null) { + sender.setServerLocation(con.getServer()); + } + new UpdateAttributesProcessor(this.sender).distribute(false); + } + } + } + } + } catch (ServerConnectivityException e) { + this.failedConnectCount++; + Throwable ex = null; + + if (e.getCause() instanceof GemFireSecurityException) { + ex = e.getCause(); + if (logConnectionFailure()) { + // only log this message once; another msg is logged once we connect + logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1, + new Object[] { this.processor.getSender().getId(), ex.getMessage() })); + } + throw new GatewaySenderException(ex); + } + List servers = this.sender.getProxy() + .getCurrentServers(); + String ioMsg = null; + if (servers.size() == 0) { + ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_THERE_ARE_NO_ACTIVE_SERVERS + .toLocalizedString(); + } else { + final StringBuilder buffer = new StringBuilder(); + for (ServerLocation server : servers) { + String endpointName = String.valueOf(server); + if (buffer.length() > 0) { + buffer.append(", "); + } + buffer.append(endpointName); + } + ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_NO_AVAILABLE_CONNECTION_WAS_FOUND_BUT_THE_FOLLOWING_ACTIVE_SERVERS_EXIST_0 + .toLocalizedString(buffer.toString()); + } + ex = new IOException(ioMsg); + // Set the serverLocation to null so that a new connection can be + // obtained in next attempt + this.sender.setServerLocation(null); + if (this.failedConnectCount == 1) { + // only log this message once; another msg is logged once we connect + logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT, + this.processor.getSender().getId())); + + } + // Wrap the IOException in a GatewayException so it can be processed the + // same as the other exceptions that might occur in sendBatch. + throw new GatewaySenderException( + LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT + .toLocalizedString(this.processor.getSender().getId()), ex); + } + if (this.failedConnectCount > 0) { + Object[] logArgs = new Object[] { this.processor.getSender().getId(), + con, Integer.valueOf(this.failedConnectCount) }; + logger.info(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1_AFTER_2_FAILED_CONNECT_ATTEMPTS, + logArgs)); + this.failedConnectCount = 0; + } else { + Object[] logArgs = new Object[] { this.processor.getSender().getId(), + con }; + logger.info(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1, logArgs)); + } + this.connection = con; + this.processor.checkIfPdxNeedsResend(this.connection.getQueueStatus().getPdxSize()); + } + catch (ConnectionDestroyedException e) { + throw new GatewaySenderException( + LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT.toLocalizedString(this.processor + .getSender().getId()), e); + } + finally { + this.connectionLifeCycleLock.writeLock().unlock(); + } + } + + protected boolean logConnectionFailure() { + // always log the first failure + if (logger.isDebugEnabled() || this.failedConnectCount == 0) { + return true; + } + else { + // subsequent failures will be logged on 30th, 300th, 3000th try + // each try is at 100millis from higher layer so this accounts for logging + // after 3s, 30s and then every 5mins + if (this.failedConnectCount >= 3000) { + return (this.failedConnectCount % 3000) == 0; + } + else { + return (this.failedConnectCount == 30 || this.failedConnectCount == 300); + } + } + } + + public static class GatewayAck { + private int batchId; + + private int numEvents; + + private BatchException70 be; + + public GatewayAck(BatchException70 be, int bId) { + this.be = be; + this.batchId = bId; + } + + public GatewayAck(int batchId, int numEvents) { + this.batchId = batchId; + this.numEvents = numEvents; + } + + /** + * @return the numEvents + */ + public int getNumEvents() { + return numEvents; + } + + /** + * @return the batchId + */ + public int getBatchId() { + return batchId; + } + + public BatchException70 getBatchException() { + return this.be; + } + } + + class AckReaderThread extends Thread { + + private Object runningStateLock = new Object(); + + /** + * boolean to make a shutdown request + */ + private volatile boolean shutdown = false; + + private final GemFireCacheImpl cache; + + private volatile boolean ackReaderThreadRunning = false; + + public AckReaderThread(GatewaySender sender, AbstractGatewaySenderEventProcessor processor) { + super("AckReaderThread for : " + processor.getName()); + this.setDaemon(true); + this.cache = (GemFireCacheImpl)((AbstractGatewaySender)sender).getCache(); + } + + public void waitForRunningAckReaderThreadRunningState() { + synchronized (runningStateLock) { + while (!this.ackReaderThreadRunning) { + try { + this.runningStateLock.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + } + + private boolean checkCancelled() { + if (shutdown) { + return true; + } + + if (cache.getCancelCriterion().isCancelInProgress()) { + return true; + } + return false; + } + + @Override + public void run() { + if (logger.isDebugEnabled()) { + logger.debug("AckReaderThread started.. "); + } + + synchronized (runningStateLock) { + ackReaderThreadRunning = true; + this.runningStateLock.notifyAll(); + } + + try { + for (;;) { + if (checkCancelled()) { + break; + } + GatewayAck ack = readAcknowledgement(); + if (ack != null) { + boolean gotBatchException = ack.getBatchException() != null; + int batchId = ack.getBatchId(); + int numEvents = ack.getNumEvents(); + + // If the batch is successfully processed, remove it from the + // queue. + if (gotBatchException) { + logger.info(LocalizedMessage.create( + LocalizedStrings.GatewaySenderEventRemoteDispatcher_GATEWAY_SENDER_0_RECEIVED_ACK_FOR_BATCH_ID_1_WITH_EXCEPTION, + new Object[] { processor.getSender(), ack.getBatchId() }, ack.getBatchException())); + // If we get PDX related exception in the batch exception then try + // to resend all the pdx events as well in the next batch. + final GatewaySenderStats statistics = sender.getStatistics(); + statistics.incBatchesRedistributed(); + // log batch exceptions and remove all the events if remove from + // exception is true + // do not remove if it is false + logBatchExceptions(ack.getBatchException()); + processor.handleSuccessBatchAck(batchId); + + } // unsuccessful batch + else { // The batch was successful. + if (logger.isDebugEnabled()) { + logger.debug("Gateway Sender {} : Received ack for batch id {} of {} events", + processor.getSender(), ack.getBatchId(), ack.getNumEvents()); + } + processor.handleSuccessBatchAck(batchId); + } + } else { + // If we have received IOException. + if (logger.isDebugEnabled()) { + logger.debug("{}: Received null ack from remote site.", processor.getSender()); + } + processor.handleException(); + try { // This wait is before trying to getting new connection to + // receive ack. Without this there will be continuous call to + // getConnection + Thread.sleep(GatewaySender.CONNECTION_RETRY_INTERVAL); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } catch (Exception e) { + if (!checkCancelled()) { + logger.fatal(LocalizedMessage.create( + LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH) ,e); + } + sender.getLifeCycleLock().writeLock().lock(); + try { + processor.stopProcessing(); + sender.clearTempEventsAfterSenderStopped(); + } finally { + sender.getLifeCycleLock().writeLock().unlock(); + } + // destroyConnection(); + } finally { + if (logger.isDebugEnabled()) { + logger.debug("AckReaderThread exiting. "); + } + ackReaderThreadRunning = false; + } + + } + + /** + * @param exception + * + */ + private void logBatchExceptions(BatchException70 exception) { + for (BatchException70 be : exception.getExceptions()) { + boolean logWarning = true; + if (be.getCause() instanceof RegionDestroyedException) { + RegionDestroyedException rde = (RegionDestroyedException)be + .getCause(); + synchronized (notFoundRegionsSync) { + if (notFoundRegions.contains(rde.getRegionFullPath())) { + logWarning = false; + } else { + notFoundRegions.add(rde.getRegionFullPath()); + } + } + } else if (be.getCause() instanceof IllegalStateException + && be.getCause().getMessage().contains("Unknown pdx type")) { + List pdxEvents = processor + .getBatchIdToPDXEventsMap().get(be.getBatchId()); + if (logWarning) { + logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_PDX_EVENT__0, + be.getIndex()), be); + } + if (pdxEvents != null) { + for (GatewaySenderEventImpl senderEvent : pdxEvents) { + senderEvent.isAcked = false; + } + GatewaySenderEventImpl gsEvent = pdxEvents.get(be.getIndex()); + if (logWarning) { + logger.warn(LocalizedMessage.create( + LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0, gsEvent)); + } + } + continue; + } + if (logWarning) { + logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayEventRemoteDispatcher_A_BATCHEXCEPTION_OCCURRED_PROCESSING_EVENT__0, + be.getIndex()), be); + } + List[] eventsArr = processor.getBatchIdToEventsMap().get(be.getBatchId()); + if (eventsArr != null) { + List filteredEvents = eventsArr[1]; + GatewaySenderEventImpl gsEvent = (GatewaySenderEventImpl)filteredEvents + .get(be.getIndex()); + if (logWarning) { + logger.warn(LocalizedMessage.create( + LocalizedStrings.GatewayEventRemoteDispatcher_THE_EVENT_BEING_PROCESSED_WHEN_THE_BATCHEXCEPTION_OCCURRED_WAS__0, gsEvent)); + } + } + } + } + + boolean isRunning() { + return this.ackReaderThreadRunning; + } + + public void shutdown() { + // we need to destroy connection irrespective of we are listening on it or + // not. No need to take lock as the reader thread may be blocked and we might not + // get chance to destroy unless that returns. + if (connection != null) { + Connection conn = connection; + shutDownAckReaderConnection(); + if (!conn.isDestroyed()) { + conn.destroy(); + sender.getProxy().returnConnection(conn); + } + } + this.shutdown = true; + boolean interrupted = Thread.interrupted(); + try { + this.join(15 * 1000); + } catch (InterruptedException e) { + interrupted = true; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + if (this.isAlive()) { + logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_ACKREADERTHREAD_IGNORED_CANCELLATION)); + } + } + + private void shutDownAckReaderConnection() { + Connection conn = connection; + //attempt to unblock the ackReader thread by shutting down the inputStream, if it was stuck on a read + try { + if (conn != null && conn.getInputStream() != null) { + conn.getInputStream().close(); + } + } catch (IOException e) { + logger.warn("Unable to shutdown AckReaderThread Connection"); + } catch (ConnectionDestroyedException e) { + logger.info("AckReader shutting down and connection already destroyed"); + } + + } + } + + public void stopAckReaderThread() { + if (this.ackReaderThread != null) { + this.ackReaderThread.shutdown(); + } + } + + @Override + public boolean isRemoteDispatcher() { + return true; + } + + @Override + public boolean isConnectedToRemote() { + return connection != null; + } + + public void stop() { + stopAckReaderThread(); + if(this.processor.isStopped()) { + destroyConnection(); + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java new file mode 100644 index 0000000..4974c6f --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; +import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallback; +import com.gemstone.gemfire.cache.wan.GatewayEventFilter; +import com.gemstone.gemfire.cache.wan.GatewayEventSubstitutionFilter; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; +import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderImpl; +import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderImpl; +import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation; +import com.gemstone.gemfire.internal.cache.xmlcache.ParallelGatewaySenderCreation; +import com.gemstone.gemfire.internal.cache.xmlcache.SerialGatewaySenderCreation; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; + +/** + * + * @since GemFire 7.0 + * + */ +public class GatewaySenderFactoryImpl implements + InternalGatewaySenderFactory { + + private static final Logger logger = LogService.getLogger(); + + /** + * Used internally to pass the attributes from this factory to the real + * GatewaySender it is creating. + */ + private GatewaySenderAttributes attrs = new GatewaySenderAttributes(); + + private Cache cache; + + private static final AtomicBoolean GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED = new AtomicBoolean(false); + + public GatewaySenderFactoryImpl(Cache cache) { + this.cache = cache; + } + + public GatewaySenderFactory setParallel(boolean isParallel){ + this.attrs.isParallel = isParallel; + return this; + } + + public GatewaySenderFactory setForInternalUse(boolean isForInternalUse) { + this.attrs.isForInternalUse = isForInternalUse; + return this; + } + + public GatewaySenderFactory addGatewayEventFilter( + GatewayEventFilter filter) { + this.attrs.addGatewayEventFilter(filter); + return this; + } + + public GatewaySenderFactory addGatewayTransportFilter( + GatewayTransportFilter filter) { + this.attrs.addGatewayTransportFilter(filter); + return this; + } + + public GatewaySenderFactory addAsyncEventListener( + AsyncEventListener listener) { + this.attrs.addAsyncEventListener(listener); + return this; + } + + public GatewaySenderFactory setSocketBufferSize(int socketBufferSize) { + this.attrs.socketBufferSize = socketBufferSize; + return this; + } + + public GatewaySenderFactory setSocketReadTimeout(int socketReadTimeout) { + this.attrs.socketReadTimeout = socketReadTimeout; + return this; + } + + public GatewaySenderFactory setDiskStoreName(String diskStoreName) { + this.attrs.diskStoreName = diskStoreName; + return this; + } + + public GatewaySenderFactory setMaximumQueueMemory(int maximumQueueMemory) { + this.attrs.maximumQueueMemory = maximumQueueMemory; + return this; + } + + public GatewaySenderFactory setBatchSize(int batchSize) { + this.attrs.batchSize = batchSize; + return this; + } + + public GatewaySenderFactory setBatchTimeInterval(int batchTimeInterval) { + this.attrs.batchTimeInterval = batchTimeInterval; + return this; + } + + public GatewaySenderFactory setBatchConflationEnabled( + boolean enableBatchConflation) { + this.attrs.isBatchConflationEnabled = enableBatchConflation; + return this; + } + + public GatewaySenderFactory setPersistenceEnabled( + boolean enablePersistence) { + this.attrs.isPersistenceEnabled = enablePersistence; + return this; + } + + public GatewaySenderFactory setAlertThreshold(int threshold) { + this.attrs.alertThreshold = threshold; + return this; + } + + public GatewaySenderFactory setManualStart(boolean start) { + this.attrs.manualStart = start; + return this; + } + + public GatewaySenderFactory setLocatorDiscoveryCallback( + LocatorDiscoveryCallback locCallback) { + this.attrs.locatorDiscoveryCallback = locCallback; + return this; + } + + @Override + public GatewaySenderFactory setDiskSynchronous(boolean isSynchronous) { + this.attrs.isDiskSynchronous = isSynchronous; + return this; + } + + @Override + public GatewaySenderFactory setDispatcherThreads(int numThreads) { + if ((numThreads > 1) && this.attrs.policy == null) { + this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY; + } + this.attrs.dispatcherThreads = numThreads; + return this; + } + + public GatewaySenderFactory setParallelFactorForReplicatedRegion(int parallel) { + this.attrs.parallelism = parallel; + this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY; + return this; + } + + @Override + public GatewaySenderFactory setOrderPolicy(OrderPolicy policy) { + this.attrs.policy = policy; + return this; + } + + public GatewaySenderFactory setBucketSorted(boolean isBucketSorted){ + this.attrs.isBucketSorted = isBucketSorted; + return this; + } + public GatewaySender create(String id, int remoteDSId) { + int myDSId = InternalDistributedSystem.getAnyInstance() + .getDistributionManager().getDistributedSystemId(); + if (remoteDSId == myDSId) { + throw new GatewaySenderException( + LocalizedStrings.GatewaySenderImpl_GATEWAY_0_CANNOT_BE_CREATED_WITH_REMOTE_SITE_ID_EQUAL_TO_THIS_SITE_ID + .toLocalizedString(id)); + } + if (remoteDSId < 0) { + throw new GatewaySenderException( + LocalizedStrings.GatewaySenderImpl_GATEWAY_0_CANNOT_BE_CREATED_WITH_REMOTE_SITE_ID_LESS_THAN_ZERO + .toLocalizedString(id)); + } + this.attrs.id = id; + this.attrs.remoteDs = remoteDSId; + GatewaySender sender = null; + + if(this.attrs.getDispatcherThreads() <= 0){ + throw new GatewaySenderException( + LocalizedStrings.GatewaySenderImpl_GATEWAY_SENDER_0_CANNOT_HAVE_DISPATCHER_THREADS_LESS_THAN_1 + .toLocalizedString(id)); + } + + // Verify socket read timeout if a proper logger is available + if (this.cache instanceof GemFireCacheImpl) { + // If socket read timeout is less than the minimum, log a warning. + // Ideally, this should throw a GatewaySenderException, but wan dunit tests + // were failing, and we were running out of time to change them. + if (this.attrs.getSocketReadTimeout() != 0 + && this.attrs.getSocketReadTimeout() < GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT) { + logger.warn(LocalizedMessage.create(LocalizedStrings.Gateway_CONFIGURED_SOCKET_READ_TIMEOUT_TOO_LOW, + new Object[] { "GatewaySender " + id, this.attrs.getSocketReadTimeout(), GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT })); + this.attrs.socketReadTimeout = GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT; + } + + // Log a warning if the old system property is set. + if (GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED.compareAndSet(false, true)) { + if (System.getProperty(GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY) != null) { + logger.warn(LocalizedMessage.create(LocalizedStrings.Gateway_OBSOLETE_SYSTEM_POPERTY, + new Object[] { GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY, "GatewaySender socket read timeout" })); + } + } + } + + if (this.attrs.isParallel()) { +// if(this.attrs.getDispatcherThreads() != 1){ +// throw new GatewaySenderException( +// LocalizedStrings.GatewaySenderImpl_PARALLEL_GATEWAY_SENDER_0_CANNOT_BE_CREATED_WITH_DISPATCHER_THREADS_OTHER_THAN_1 +// .toLocalizedString(id)); +// } + if ((this.attrs.getOrderPolicy() != null) + && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) { + throw new GatewaySenderException( + LocalizedStrings.GatewaySenderImpl_PARALLEL_GATEWAY_SENDER_0_CANNOT_BE_CREATED_WITH_ORDER_POLICY_1 + .toLocalizedString(id, this.attrs.getOrderPolicy())); + } + if (this.cache instanceof GemFireCacheImpl) { + sender = new ParallelGatewaySenderImpl(this.cache, this.attrs); + ((GemFireCacheImpl)this.cache).addGatewaySender(sender); + + if (!this.attrs.isManualStart()) { + sender.start(); + } + } + else if (this.cache instanceof CacheCreation) { + sender = new ParallelGatewaySenderCreation(this.cache, this.attrs); + ((CacheCreation)this.cache).addGatewaySender(sender); + } + } + else { + if (this.attrs.getAsyncEventListeners().size() > 0) { + throw new GatewaySenderException( + LocalizedStrings.SerialGatewaySenderImpl_GATEWAY_0_CANNOT_DEFINE_A_REMOTE_SITE_BECAUSE_AT_LEAST_ONE_LISTENER_IS_ALREADY_ADDED + .toLocalizedString(id)); + } +// if (this.attrs.getOrderPolicy() != null) { +// if (this.attrs.getDispatcherThreads() == GatewaySender.DEFAULT_DISPATCHER_THREADS) { +// throw new GatewaySenderException( +// LocalizedStrings.SerialGatewaySender_INVALID_GATEWAY_SENDER_ORDER_POLICY_CONCURRENCY_0 +// .toLocalizedString(id)); +// } +// } + if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) { + this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY; + } + if (this.cache instanceof GemFireCacheImpl) { + sender = new SerialGatewaySenderImpl(this.cache, this.attrs); + ((GemFireCacheImpl)this.cache).addGatewaySender(sender); + + if (!this.attrs.isManualStart()) { + sender.start(); + } + } + else if (this.cache instanceof CacheCreation) { + sender = new SerialGatewaySenderCreation(this.cache, this.attrs); + ((CacheCreation)this.cache).addGatewaySender(sender); + } + } + return sender; + } + + public GatewaySender create(String id) { + this.attrs.id = id; + GatewaySender sender = null; + + if(this.attrs.getDispatcherThreads() <= 0) { + throw new AsyncEventQueueConfigurationException( + LocalizedStrings.AsyncEventQueue_0_CANNOT_HAVE_DISPATCHER_THREADS_LESS_THAN_1 + .toLocalizedString(id)); + } + + if (this.attrs.isParallel()) { + if ((this.attrs.getOrderPolicy() != null) + && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) { + throw new AsyncEventQueueConfigurationException( + LocalizedStrings.AsyncEventQueue_0_CANNOT_BE_CREATED_WITH_ORDER_POLICY_1 + .toLocalizedString(id, this.attrs.getOrderPolicy())); + } + + if (this.cache instanceof GemFireCacheImpl) { + sender = new ParallelGatewaySenderImpl(this.cache, this.attrs); + ((GemFireCacheImpl)this.cache).addGatewaySender(sender); + if (!this.attrs.isManualStart()) { + sender.start(); + } + } + else if (this.cache instanceof CacheCreation) { + sender = new ParallelGatewaySenderCreation(this.cache, this.attrs); + ((CacheCreation)this.cache).addGatewaySender(sender); + } + } + else { +// if (this.attrs.getOrderPolicy() != null) { +// if (this.attrs.getDispatcherThreads() == GatewaySender.DEFAULT_DISPATCHER_THREADS) { +// throw new AsyncEventQueueConfigurationException( +// LocalizedStrings.AsyncEventQueue_INVALID_ORDER_POLICY_CONCURRENCY_0 +// .toLocalizedString(id)); +// } +// } + if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) { + this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY; + } + if (this.cache instanceof GemFireCacheImpl) { + sender = new SerialGatewaySenderImpl(this.cache, this.attrs); + ((GemFireCacheImpl)this.cache).addGatewaySender(sender); + if (!this.attrs.isManualStart()) { + sender.start(); + } + } + else if (this.cache instanceof CacheCreation) { + sender = new SerialGatewaySenderCreation(this.cache, this.attrs); + ((CacheCreation)this.cache).addGatewaySender(sender); + } + } + return sender; + } + + public GatewaySenderFactory removeGatewayEventFilter( + GatewayEventFilter filter) { + this.attrs.eventFilters.remove(filter); + return this; + } + + public GatewaySenderFactory removeGatewayTransportFilter( + GatewayTransportFilter filter) { + this.attrs.transFilters.remove(filter); + return this; + } + + public GatewaySenderFactory setGatewayEventSubstitutionFilter( + GatewayEventSubstitutionFilter filter) { + this.attrs.eventSubstitutionFilter = filter; + return this; + } + + public void configureGatewaySender(GatewaySender senderCreation) { + this.attrs.isParallel = senderCreation.isParallel(); + this.attrs.manualStart = senderCreation.isManualStart(); + this.attrs.socketBufferSize = senderCreation.getSocketBufferSize(); + this.attrs.socketReadTimeout = senderCreation.getSocketReadTimeout(); + this.attrs.isBatchConflationEnabled = senderCreation.isBatchConflationEnabled(); + this.attrs.batchSize = senderCreation.getBatchSize(); + this.attrs.batchTimeInterval = senderCreation.getBatchTimeInterval(); + this.attrs.isPersistenceEnabled = senderCreation.isPersistenceEnabled(); + this.attrs.diskStoreName = senderCreation.getDiskStoreName(); + this.attrs.isDiskSynchronous = senderCreation.isDiskSynchronous(); + this.attrs.maximumQueueMemory = senderCreation.getMaximumQueueMemory(); + this.attrs.alertThreshold = senderCreation.getAlertThreshold(); + this.attrs.dispatcherThreads = senderCreation.getDispatcherThreads(); + this.attrs.policy = senderCreation.getOrderPolicy(); + for(GatewayEventFilter filter : senderCreation.getGatewayEventFilters()){ + this.attrs.eventFilters.add(filter); + } + for(GatewayTransportFilter filter : senderCreation.getGatewayTransportFilters()){ + this.attrs.transFilters.add(filter); + } + this.attrs.eventSubstitutionFilter = senderCreation.getGatewayEventSubstitutionFilter(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java new file mode 100644 index 0000000..322b1ba --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.parallel; +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.EntryOperation; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; +import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; +import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; +import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile; +import com.gemstone.gemfire.distributed.internal.DistributionAdvisor; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.distributed.internal.ResourceEvent; +import com.gemstone.gemfire.internal.cache.DistributedRegion; +import com.gemstone.gemfire.internal.cache.EntryEventImpl; +import com.gemstone.gemfire.internal.cache.EventID; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper; +import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor; +import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier; +import com.gemstone.gemfire.internal.cache.wan.AbstractRemoteGatewaySender; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes; +import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; + +/** + * @since GemFire 7.0 + * + */ +public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender { + + private static final Logger logger = LogService.getLogger(); + + final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup( + "Remote Site Discovery Logger Group", logger); + + public ParallelGatewaySenderImpl(){ + super(); + this.isParallel = true; + } + + public ParallelGatewaySenderImpl(Cache cache, GatewaySenderAttributes attrs) { + super(cache, attrs); + } + + @Override + public void start() { + this.getLifeCycleLock().writeLock().lock(); + try { + if (isRunning()) { + logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING, this.getId())); + return; + } + + if (this.remoteDSId != DEFAULT_DISTRIBUTED_SYSTEM_ID) { + String locators = ((GemFireCacheImpl)this.cache).getDistributedSystem() + .getConfig().getLocators(); + if (locators.length() == 0) { + throw new IllegalStateException( + LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER + .toLocalizedString()); + } + } + /* + * Now onwards all processing will happen through "ConcurrentParallelGatewaySenderEventProcessor" + * we have made "ParallelGatewaySenderEventProcessor" and "ParallelGatewaySenderQueue" as a + * utility classes of Concurrent version of processor and queue. + */ + eventProcessor = new RemoteConcurrentParallelGatewaySenderEventProcessor(this); + /*if (getDispatcherThreads() > 1) { + eventProcessor = new ConcurrentParallelGatewaySenderEventProcessor(this); + } else { + eventProcessor = new ParallelGatewaySenderEventProcessor(this); + }*/ + + eventProcessor.start(); + waitForRunningStatus(); + //Only notify the type registry if this is a WAN gateway queue + if(!isAsyncEventQueue()) { + ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this); + } + new UpdateAttributesProcessor(this).distribute(false); + + InternalDistributedSystem system = (InternalDistributedSystem) this.cache + .getDistributedSystem(); + system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_START, this); + + logger.info(LocalizedMessage.create(LocalizedStrings.ParallelGatewaySenderImpl_STARTED__0, this)); + + enqueueTempEvents(); + } + finally { + this.getLifeCycleLock().writeLock().unlock(); + } + } + +// /** +// * The sender is not started but only the message queue i.e. shadowPR is created on the node. +// * @param targetPr +// */ +// private void createMessageQueueOnAccessorNode(PartitionedRegion targetPr) { +// eventProcessor = new ParallelGatewaySenderEventProcessor(this, targetPr); +// } + + + @Override + public void stop() { + this.getLifeCycleLock().writeLock().lock(); + try { + if (!this.isRunning()) { + return; + } + // Stop the dispatcher + AbstractGatewaySenderEventProcessor ev = this.eventProcessor; + //try { + if (ev != null && !ev.isStopped()) { + ev.stopProcessing(); + } + + // Stop the proxy (after the dispatcher, so the socket is still + // alive until after the dispatcher has stopped) + stompProxyDead(); + + // Close the listeners + for (AsyncEventListener listener : this.listeners) { + listener.close(); + } + //stop the running threads, open sockets if any + ((ConcurrentParallelGatewaySenderQueue)this.eventProcessor.getQueue()).cleanUp(); + + logger.info(LocalizedMessage.create(LocalizedStrings.GatewayImpl_STOPPED__0, this)); + + InternalDistributedSystem system = (InternalDistributedSystem) this.cache + .getDistributedSystem(); + system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this); + + clearTempEventsAfterSenderStopped(); + // Keep the eventProcessor around so we can ask it for the regionQueues later. + // Tests expect to be able to do this. +// } finally { +// this.eventProcessor = null; +// } + } + finally { + this.getLifeCycleLock().writeLock().unlock(); + } + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("ParallelGatewaySender{"); + sb.append("id=" + getId()); + sb.append(",remoteDsId="+ getRemoteDSId()); + sb.append(",isRunning ="+ isRunning()); + sb.append("}"); + return sb.toString(); + } + + public void fillInProfile(Profile profile) { + assert profile instanceof GatewaySenderProfile; + GatewaySenderProfile pf = (GatewaySenderProfile)profile; + pf.Id = getId(); + pf.remoteDSId = getRemoteDSId(); + pf.isRunning = isRunning(); + pf.isPrimary = isPrimary(); + pf.isParallel = true; + pf.isBatchConflationEnabled = isBatchConflationEnabled(); + pf.isPersistenceEnabled = isPersistenceEnabled(); + pf.alertThreshold = getAlertThreshold(); + pf.manualStart = isManualStart(); + pf.dispatcherThreads = getDispatcherThreads(); + pf.orderPolicy = getOrderPolicy(); + for (com.gemstone.gemfire.cache.wan.GatewayEventFilter filter : getGatewayEventFilters()) { + pf.eventFiltersClassNames.add(filter.getClass().getName()); + } + for (GatewayTransportFilter filter : getGatewayTransportFilters()) { + pf.transFiltersClassNames.add(filter.getClass().getName()); + } + for (AsyncEventListener listener : getAsyncEventListeners()) { + pf.senderEventListenerClassNames.add(listener.getClass().getName()); + } + pf.isDiskSynchronous = isDiskSynchronous(); + } + + /* (non-Javadoc) + * @see com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender#setModifiedEventId(com.gemstone.gemfire.internal.cache.EntryEventImpl) + */ + @Override + protected void setModifiedEventId(EntryEventImpl clonedEvent) { + int bucketId = -1; + //merged from 42004 + if (clonedEvent.getRegion() instanceof DistributedRegion) { +// if (getOrderPolicy() == OrderPolicy.THREAD) { +// bucketId = PartitionedRegionHelper.getHashKey( +// ((EntryEventImpl)clonedEvent).getEventId().getThreadID(), +// getMaxParallelismForReplicatedRegion()); +// } +// else + bucketId = PartitionedRegionHelper.getHashKey(clonedEvent.getKey(), + getMaxParallelismForReplicatedRegion()); + } + else { + bucketId = PartitionedRegionHelper + .getHashKey((EntryOperation)clonedEvent); + } + EventID originalEventId = clonedEvent.getEventId(); + long originatingThreadId = ThreadIdentifier.getRealThreadID(originalEventId.getThreadID()); + + long newThreadId = ThreadIdentifier + .createFakeThreadIDForParallelGSPrimaryBucket(bucketId, + originatingThreadId, getEventIdIndex()); + + // In case of parallel as all events go through primary buckets + // we don't need to generate different threadId for secondary buckets + // as they will be rejected if seen at PR level itself + +// boolean isPrimary = ((PartitionedRegion)getQueue().getRegion()) +// .getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary(); +// if (isPrimary) { +// newThreadId = ThreadIdentifier +// .createFakeThreadIDForParallelGSPrimaryBucket(bucketId, +// originatingThreadId); +// } else { +// newThreadId = ThreadIdentifier +// .createFakeThreadIDForParallelGSSecondaryBucket(bucketId, +// originatingThreadId); +// } + + EventID newEventId = new EventID(originalEventId.getMembershipID(), + newThreadId, originalEventId.getSequenceID(), bucketId); + if (logger.isDebugEnabled()) { + logger.debug("{}: Generated event id for event with key={}, bucketId={}, original event id={}, threadId={}, new event id={}, newThreadId={}", + this, clonedEvent.getKey(), bucketId, originalEventId, originatingThreadId, newEventId, newThreadId); + } + clonedEvent.setEventId(newEventId); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/701c6861/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java new file mode 100644 index 0000000..35cdece --- /dev/null +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/RemoteConcurrentParallelGatewaySenderEventProcessor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.parallel; + + +import java.util.Set; + +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher; +import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor; +/** + * Remote version of GatewaySenderEvent Processor + * + */ +public class RemoteConcurrentParallelGatewaySenderEventProcessor extends ConcurrentParallelGatewaySenderEventProcessor{ + + public RemoteConcurrentParallelGatewaySenderEventProcessor( + AbstractGatewaySender sender) { + super(sender); + } + + @Override + protected void createProcessors(int dispatcherThreads, Set targetRs) { + processors = new RemoteParallelGatewaySenderEventProcessor[sender.getDispatcherThreads()]; + if (logger.isDebugEnabled()) { + logger.debug("Creating GatewaySenderEventProcessor"); + } + for (int i = 0; i < sender.getDispatcherThreads(); i++) { + processors[i] = new RemoteParallelGatewaySenderEventProcessor(sender, + targetRs, i, sender.getDispatcherThreads()); + } + } + + @Override + protected void rebalance() { + GatewaySenderStats statistics = this.sender.getStatistics(); + long startTime = statistics.startLoadBalance(); + try { + for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) { + GatewaySenderEventRemoteDispatcher remoteDispatcher = (GatewaySenderEventRemoteDispatcher)parallelProcessor.getDispatcher(); + if (remoteDispatcher.isConnectedToRemote()) { + remoteDispatcher.stopAckReaderThread(); + remoteDispatcher.destroyConnection(); + } + } + } finally { + statistics.endLoadBalance(startTime); + } + } +}