Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3963C18EDA for ; Thu, 17 Mar 2016 21:14:06 +0000 (UTC) Received: (qmail 31996 invoked by uid 500); 17 Mar 2016 21:14:06 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 31966 invoked by uid 500); 17 Mar 2016 21:14:06 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 31957 invoked by uid 99); 17 Mar 2016 21:14:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Mar 2016 21:14:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id C1746C0D64 for ; Thu, 17 Mar 2016 21:14:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id ccKzbbW2VNFd for ; Thu, 17 Mar 2016 21:14:01 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 98DD35F22F for ; Thu, 17 Mar 2016 21:14:00 +0000 (UTC) Received: (qmail 30352 invoked by uid 99); 17 Mar 2016 21:14:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Mar 2016 21:14:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 118AFE0120; Thu, 17 Mar 2016 21:14:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: klund@apache.org To: commits@geode.incubator.apache.org Date: Thu, 17 Mar 2016 21:14:07 -0000 Message-Id: <158bba9aef9448bbac21188b34fb53ce@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/26] incubator-geode git commit: GEODE-478: GatewaySender now handles MessageTooLargeExceptions GEODE-478: GatewaySender now handles MessageTooLargeExceptions Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a904f147 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a904f147 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a904f147 Branch: refs/heads/feature/GEODE-949-2 Commit: a904f1474ec3153bc39f650d49731214f25c6230 Parents: 4d0dfc5 Author: Barry Oglesby Authored: Tue Mar 8 15:55:34 2016 -0800 Committer: Barry Oglesby Committed: Wed Mar 16 09:56:41 2016 -0700 ---------------------------------------------------------------------- .../internal/cache/tier/sockets/Message.java | 2 +- .../AbstractGatewaySenderEventProcessor.java | 33 +++++++-- .../parallel/ParallelGatewaySenderQueue.java | 74 ++++++++++++++++---- .../gemfire/internal/i18n/LocalizedStrings.java | 6 +- .../wan/GatewaySenderEventRemoteDispatcher.java | 31 +++++++- .../gemfire/internal/cache/wan/WANTestBase.java | 26 +++++-- ...arallelGatewaySenderOperationsDUnitTest.java | 35 +++++++++ 7 files changed, 177 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java index a6495e2..44c88c1 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java @@ -564,7 +564,7 @@ public class Message { msgLen = (int)(headerLen + totalPartLen); if (msgLen > MAX_MESSAGE_SIZE) { - throw new MessageTooLargeException("Message size(" + msgLen + throw new MessageTooLargeException("Message size (" + msgLen + ") exceeds gemfire.client.max-message-size setting (" + MAX_MESSAGE_SIZE + ")"); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 86ecce1..51b125a 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -38,12 +38,9 @@ import com.gemstone.gemfire.cache.EntryEvent; import com.gemstone.gemfire.cache.Operation; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionDestroyedException; -import com.gemstone.gemfire.cache.client.internal.Connection; -import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException; import com.gemstone.gemfire.cache.wan.GatewayEventFilter; import com.gemstone.gemfire.cache.wan.GatewayQueueEvent; import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.internal.Version; import com.gemstone.gemfire.internal.cache.BucketRegion; import com.gemstone.gemfire.internal.cache.Conflatable; import com.gemstone.gemfire.internal.cache.DistributedRegion; @@ -143,6 +140,13 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { private volatile boolean resetLastPeekedEvents; private long numEventsDispatched; + + /** + * The batchSize is the batch size being used by this processor. By default, it is the + * configured batch size of the GatewaySender. It may be automatically reduced if a + * MessageTooLargeException occurs. + */ + private int batchSize; /** * @param createThreadGroup @@ -152,6 +156,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { String string, GatewaySender sender) { super(createThreadGroup, string); this.sender = (AbstractGatewaySender)sender; + this.batchSize = sender.getBatchSize(); } abstract protected void initializeMessageQueue(String id); @@ -214,6 +219,23 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { this.resetLastPeekedEvents = true; } + protected int getBatchSize() { + return this.batchSize; + } + + protected void setBatchSize(int batchSize) { + int currentBatchSize = this.batchSize; + if (batchSize <= 0) { + this.batchSize = 1; + logger.warn(LocalizedMessage.create( + LocalizedStrings.AbstractGatewaySenderEventProcessor_ATTEMPT_TO_SET_BATCH_SIZE_FAILED, new Object[] { currentBatchSize, batchSize })); + } else { + this.batchSize = batchSize; + logger.info(LocalizedMessage.create( + LocalizedStrings.AbstractGatewaySenderEventProcessor_SET_BATCH_SIZE, new Object[] { currentBatchSize, this.batchSize })); + } + } + /** * Returns the current batch id to be used to identify the next batch. * @@ -387,7 +409,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { final boolean isDebugEnabled = logger.isDebugEnabled(); final boolean isTraceEnabled = logger.isTraceEnabled(); - final int batchSize = sender.getBatchSize(); final int batchTimeInterval = sender.getBatchTimeInterval(); final GatewaySenderStats statistics = this.sender.getStatistics(); @@ -417,7 +438,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { // Peek a batch if (isDebugEnabled) { - logger.debug("Attempting to peek a batch of {} events", batchSize); + logger.debug("Attempting to peek a batch of {} events", this.batchSize); } for (;;) { // check before sleeping @@ -481,7 +502,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { } }*/ } - events = this.queue.peek(batchSize, batchTimeInterval); + events = this.queue.peek(this.batchSize, batchTimeInterval); } catch (InterruptedException e) { interrupted = true; this.sender.getCancelCriterion().checkCancelInProgress(e); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a904f147/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index c00903f..a9d0f3e 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -54,7 +54,6 @@ import com.gemstone.gemfire.cache.EvictionAttributes; import com.gemstone.gemfire.cache.PartitionAttributesFactory; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionAttributes; -import com.gemstone.gemfire.cache.RegionDestroyedException; import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; import com.gemstone.gemfire.distributed.internal.DM; import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; @@ -143,7 +142,17 @@ public class ParallelGatewaySenderQueue implements RegionQueue { private static BatchRemovalThread removalThread = null; protected BlockingQueue peekedEvents = new LinkedBlockingQueue(); - + + /** + * The peekedEventsProcessing queue is used when the batch size is reduced due to a MessageTooLargeException + */ + private BlockingQueue peekedEventsProcessing = new LinkedBlockingQueue(); + + /** + * The peekedEventsProcessingInProgress boolean denotes that processing existing peeked events is in progress + */ + private boolean peekedEventsProcessingInProgress = false; + public final AbstractGatewaySender sender ; public static final int WAIT_CYCLE_SHADOW_BUCKET_LOAD = 10; @@ -1147,6 +1156,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue { public void resetLastPeeked() { this.resetLastPeeked = true; + + // Reset the in progress boolean and queue for peeked events in progress + this.peekedEventsProcessingInProgress = false; + this.peekedEventsProcessing.clear(); } // Need to improve here.If first peek returns NULL then look in another bucket. @@ -1283,19 +1296,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { long start = System.currentTimeMillis(); long end = start + timeToWait; - if (this.resetLastPeeked) { - batch.addAll(peekedEvents); - this.resetLastPeeked = false; - if (isDebugEnabled) { - StringBuffer buffer = new StringBuffer(); - for (GatewaySenderEventImpl ge : peekedEvents) { - buffer.append("event :"); - buffer.append(ge); - } - logger.debug("Adding already peeked events to the batch {}", buffer); - } - } - + // Add peeked events + addPeekedEvents(batch, batchSize); + int bId = -1; while (batch.size() < batchSize) { if (areLocalBucketQueueRegionsPresent() @@ -1372,6 +1375,47 @@ public class ParallelGatewaySenderQueue implements RegionQueue { return batch; } + private void addPeekedEvents(List batch, int batchSize) { + if (this.resetLastPeeked) { + if (this.peekedEventsProcessingInProgress) { + // Peeked event processing is in progress. This means that the original peekedEvents + // contained > batch size events due to a reduction in the batch size. Create a batch + // from the peekedEventsProcessing queue. + addPreviouslyPeekedEvents(batch, batchSize); + } else if (peekedEvents.size() <= batchSize) { + // This is the normal case. The connection was lost while processing a batch. + // This recreates the batch from the current peekedEvents. + batch.addAll(peekedEvents); + this.resetLastPeeked = false; + } else { + // The peekedEvents queue is > batch size. This means that the previous batch size was + // reduced due to MessageTooLargeException. Create a batch from the peekedEventsProcessing queue. + this.peekedEventsProcessing.addAll(this.peekedEvents); + this.peekedEventsProcessingInProgress = true; + addPreviouslyPeekedEvents(batch, batchSize); + } + if (logger.isDebugEnabled()) { + StringBuffer buffer = new StringBuffer(); + for (Object ge : batch) { + buffer.append("event :"); + buffer.append(ge); + } + logger.debug("Adding already peeked events to the batch {}", buffer); + } + } + } + + private void addPreviouslyPeekedEvents(List batch, int batchSize) { + for (int i=0; i WANTestBase.verifySenderDestroyed( "ln", true )); } + public void testParallelGatewaySenderMessageTooLargeException() { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + // Create and start sender with reduced maximum message size and 1 dispatcher thread + String regionName = getTestMethodName() + "_PR"; + vm4.invoke(() -> setMaximumMessageSize( 1024*1024 )); + vm4.invoke(() -> createCache( lnPort )); + vm4.invoke(() -> setNumDispatcherThreadsForTheRun( 1 )); + vm4.invoke(() -> createSender( "ln", 2, true, 100, 100, false, false, null, false )); + vm4.invoke(() -> createPartitionedRegion( regionName, "ln", 0, 100, isOffHeap() )); + + // Do puts + int numPuts = 200; + vm4.invoke(() -> doPuts( regionName, numPuts, new byte[11000] )); + validateRegionSizes(regionName, numPuts, vm4); + + // Start receiver + IgnoredException ignoredMTLE = IgnoredException.addIgnoredException(MessageTooLargeException.class.getName(), vm4); + IgnoredException ignoredGIOE = IgnoredException.addIgnoredException(GemFireIOException.class.getName(), vm4); + vm2.invoke(() -> createReceiver( nyPort )); + vm2.invoke(() -> createPartitionedRegion( regionName, null, 0, 100, isOffHeap() )); + validateRegionSizes( regionName, numPuts, vm2 ); + ignoredMTLE.remove(); + ignoredGIOE.remove(); + } + + private void setMaximumMessageSize(int maximumMessageSizeBytes) { + System.setProperty("gemfire.client.max-message-size", String.valueOf(maximumMessageSizeBytes)); + LogWriterUtils.getLogWriter().info("Set gemfire.client.max-message-size: " + System.getProperty("gemfire.client.max-message-size")); + } + private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort, boolean createAccessors, boolean startSenders) { // Note: This is a test-specific method used by several test to create