Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5D77A200C2A for ; Tue, 14 Feb 2017 21:35:26 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 5C525160B45; Tue, 14 Feb 2017 20:35:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F00B3160B75 for ; Tue, 14 Feb 2017 21:35:24 +0100 (CET) Received: (qmail 9391 invoked by uid 500); 14 Feb 2017 20:35:24 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 9185 invoked by uid 99); 14 Feb 2017 20:35:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Feb 2017 20:35:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B9D3BE0A6B; Tue, 14 Feb 2017 20:35:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: upthewaterspout@apache.org To: commits@geode.apache.org Date: Tue, 14 Feb 2017 20:35:29 -0000 Message-Id: <133f06fabe284493921e48d14f1839dd@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/13] geode git commit: GEODE-2398: Retry oplog channel.write on silent failures archived-at: Tue, 14 Feb 2017 20:35:26 -0000 GEODE-2398: Retry oplog channel.write on silent failures Implemented limited retries in two forms of Oplog.flush() when channel.write() is called. If write() returns bytes witten less than the change in the ByteBuffer positions, then reset buffer positions and re-try writing for a liomited number of times. Throws IOException if the write doesn't succeeded after a few retries (max number of retries is defined by a static). Added new unit tests. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/9b0f1657 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/9b0f1657 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/9b0f1657 Branch: refs/heads/feature/GEODE-2402 Commit: 9b0f16570aad4abc82b71d0d16167a9774449d41 Parents: 5d98a8c Author: Ken Howe Authored: Wed Feb 1 09:16:40 2017 -0800 Committer: Ken Howe Committed: Mon Feb 13 13:50:07 2017 -0800 ---------------------------------------------------------------------- .../org/apache/geode/internal/cache/Oplog.java | 66 ++++- .../geode/internal/cache/OplogFlushTest.java | 258 +++++++++++++++++++ 2 files changed, 321 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/9b0f1657/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java index 0b98364..4e426a0 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java @@ -5186,7 +5186,14 @@ public final class Oplog implements CompactableOplog, Flushable { // flush(olf, true); } + private static final int MAX_CHANNEL_RETRIES = 5; + private final void flush(OplogFile olf, boolean doSync) throws IOException { + int flushed; + int channelBytesWritten; + int numChannelRetries = 0; + int bbStartPos; + long channelStartPos; try { synchronized (this.lock/* olf */) { if (olf.RAFClosed) { @@ -5195,9 +5202,27 @@ public final class Oplog implements CompactableOplog, Flushable { ByteBuffer bb = olf.writeBuf; if (bb != null && bb.position() != 0) { bb.flip(); - int flushed = 0; + flushed = 0; do { - flushed += olf.channel.write(bb); + channelBytesWritten = 0; + bbStartPos = bb.position(); + channelStartPos = olf.channel.position(); + // differentiate between bytes written on this channel.write() iteration and the + // total number of bytes written to the channel on this call + channelBytesWritten = olf.channel.write(bb); + flushed += channelBytesWritten; + // Expect channelBytesWritten and the changes in pp.position() and channel.position() to + // be the same. If they are not, then the channel.write() silently failed. The following + // retry separates spurious failures from permanent channel failures. + if (channelBytesWritten != bb.position() - bbStartPos) { + if (numChannelRetries++ < MAX_CHANNEL_RETRIES) { + // Reset the ByteBuffer position, but take into account anything that did get + // written to the channel + bb.position(bbStartPos + (int) (olf.channel.position() - channelStartPos)); + } else { + throw new IOException("Failed to write Oplog entry to" + olf.f.getName()); + } + } } while (bb.hasRemaining()); // update bytesFlushed after entire writeBuffer is flushed to fix bug // 41201 @@ -5222,6 +5247,11 @@ public final class Oplog implements CompactableOplog, Flushable { private final void flush(OplogFile olf, ByteBuffer b1, ByteBuffer b2) throws IOException { try { + long channelStartPos; + long expectedWritten; + long flushed; + int numChannelRetries = 0; + boolean retryWrite = false; synchronized (this.lock/* olf */) { if (olf.RAFClosed) { return; @@ -5229,7 +5259,25 @@ public final class Oplog implements CompactableOplog, Flushable { this.bbArray[0] = b1; this.bbArray[1] = b2; b1.flip(); - long flushed = olf.channel.write(this.bbArray); + int b1StartPos = b1.position(); + int b2StartPos = b2.position(); + expectedWritten = b1.limit() - b1StartPos + b2.limit() - b2StartPos; + channelStartPos = olf.channel.position(); + + do { + retryWrite = false; + flushed = olf.channel.write(this.bbArray); + if (flushed != expectedWritten) { + if (numChannelRetries++ < MAX_CHANNEL_RETRIES) { + retryWrite = true; + olf.channel.position(channelStartPos); + b1.position(b1StartPos); + b2.position(b2StartPos); + } else { + throw new IOException("Failed to write Oplog entry to" + olf.f.getName()); + } + } + } while (retryWrite); this.bbArray[0] = null; this.bbArray[1] = null; // update bytesFlushed after entire writeBuffer is flushed to fix bug 41201 @@ -6382,6 +6430,18 @@ public final class Oplog implements CompactableOplog, Flushable { return "oplog#" + getOplogId() /* + "DEBUG" + System.identityHashCode(this) */; } + /** + * Method to be used only for testing + * + * @param ch Object to replace the channel in the Oplog.crf + * @return original channel object + */ + UninterruptibleFileChannel testSetCrfChannel(UninterruptibleFileChannel ch) { + UninterruptibleFileChannel chPrev = this.crf.channel; + this.crf.channel = ch; + return chPrev; + } + // //////// Methods used during recovery ////////////// // ////////////////////Inner Classes ////////////////////// http://git-wip-us.apache.org/repos/asf/geode/blob/9b0f1657/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java new file mode 100644 index 0000000..1d484e4 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache; + +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.spy; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.rules.TestName; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.apache.geode.cache.DiskAccessException; +import org.apache.geode.internal.cache.persistence.UninterruptibleFileChannel; +import org.apache.geode.test.junit.categories.IntegrationTest; + +/** + * Testing recovery from failures writing Oplog entries + */ +@Category(IntegrationTest.class) +public class OplogFlushTest extends DiskRegionTestingBase { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + // How many times to fake the write failures + private int nFakeChannelWrites = 0; + private Oplog ol = null; + private ByteBuffer bb1 = null; + private ByteBuffer bb2 = null; + private ByteBuffer[] bbArray = new ByteBuffer[2]; + private UninterruptibleFileChannel ch; + private UninterruptibleFileChannel spyCh; + + @Rule + public TestName name = new TestName(); + + class FakeChannelWriteBB implements Answer { + + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + return fakeWriteBB(ol, bb1); + } + } + + private int fakeWriteBB(Oplog ol, ByteBuffer bb) throws IOException { + if (nFakeChannelWrites > 0) { + bb.position(bb.limit()); + --nFakeChannelWrites; + return 0; + } + doCallRealMethod().when(spyCh).write(bb); + return spyCh.write(bb); + } + + private void verifyBB(ByteBuffer bb, byte[] src) { + bb.flip(); + for (int i = 0; i < src.length; ++i) { + assertEquals("Channel contents does not match expected at index " + i, src[i], bb.get()); + } + } + + private void doChannelFlushWithFailures(Oplog[] oplogs, int numFailures) throws IOException { + nFakeChannelWrites = numFailures; + ol = oplogs[0]; + ch = ol.getFileChannel(); + spyCh = spy(ch); + ol.testSetCrfChannel(spyCh); + + byte[] entry1 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}; + byte[] entry2 = {100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, + 116, 117, 118, 119}; + + bb1 = ol.getWriteBuf(); + try { + // Force channel.write() failures when writing the first entry + doAnswer(new FakeChannelWriteBB()).when(spyCh).write(bb1); + long chStartPos = ol.getFileChannel().position(); + bb1.clear(); + bb1.put(entry1); + ol.flushAll(true); + + // Write the 2nd entry without forced channel failures + nFakeChannelWrites = 0; + bb1 = ol.getWriteBuf(); + bb1.clear(); + bb1.put(entry2); + ol.flushAll(true); + long chEndPos = ol.getFileChannel().position(); + assertEquals("Change in channel position does not equal the size of the data flushed", + entry1.length + entry2.length, chEndPos - chStartPos); + ByteBuffer dst = ByteBuffer.allocateDirect(entry1.length); + ol.getFileChannel().position(chStartPos); + ol.getFileChannel().read(dst); + verifyBB(dst, entry1); + } finally { + region.destroyRegion(); + } + } + + @Test + public void testAsyncChannelWriteRetriesOnFailureDuringFlush() throws Exception { + region = DiskRegionHelperFactory.getAsyncOverFlowAndPersistRegion(cache, null); + DiskRegion dr = ((LocalRegion) region).getDiskRegion(); + Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs(); + assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs); + assertNotNull("Unexpected null Oplog", oplogs[0]); + + doChannelFlushWithFailures(oplogs, 1 /* write failure */); + } + + @Test + public void testChannelWriteRetriesOnFailureDuringFlush() throws Exception { + region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, null); + DiskRegion dr = ((LocalRegion) region).getDiskRegion(); + Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs(); + assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs); + assertNotNull("Unexpected null Oplog", oplogs[0]); + + doChannelFlushWithFailures(oplogs, 1 /* write failure */); + } + + @Test + public void testChannelRecoversFromWriteFailureRepeatedRetriesDuringFlush() throws Exception { + region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, null); + DiskRegion dr = ((LocalRegion) region).getDiskRegion(); + Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs(); + assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs); + assertNotNull("Unexpected null Oplog", oplogs[0]); + + doChannelFlushWithFailures(oplogs, 3 /* write failures */); + } + + @Test + public void testOplogFlushThrowsIOExceptioniWhenNumberOfChannelWriteRetriesExceedsLimit() + throws Exception { + expectedException.expect(DiskAccessException.class); + expectedException.expectCause(instanceOf(IOException.class)); + region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, null); + DiskRegion dr = ((LocalRegion) region).getDiskRegion(); + Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs(); + assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs); + assertNotNull("Unexpected null Oplog", oplogs[0]); + + doChannelFlushWithFailures(oplogs, 6 /* exceeds the retry limit in Oplog */); + } + + class FakeChannelWriteBBArray implements Answer { + + @Override + public Long answer(InvocationOnMock invocation) throws Throwable { + bbArray = ol.bbArray; + return fakeWriteBBArray(ol, bbArray); + } + } + + private long fakeWriteBBArray(Oplog ol, ByteBuffer[] bbA) throws IOException { + if (nFakeChannelWrites > 0) { + for (int i = 0; i < bbA.length; ++i) { + bbA[i].position(bbA[i].limit()); + } + --nFakeChannelWrites; + return 0; + } + doCallRealMethod().when(spyCh).write(bbA); + return spyCh.write(bbA); + } + + private void doChannelBBArrayFlushWithFailures(Oplog[] oplogs, int numFailures) + throws IOException { + nFakeChannelWrites = numFailures; + ol = oplogs[0]; + ch = ol.getFileChannel(); + spyCh = spy(ch); + ol.testSetCrfChannel(spyCh); + + byte[] entry1 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}; + byte[] entry2 = {16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31}; + byte[] entry3 = {32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47}; + + bb1 = ByteBuffer.allocateDirect(entry1.length); + bb2 = ByteBuffer.allocateDirect(entry2.length); + ByteBuffer[] bbArray = ol.bbArray; + try { + // Force channel.write() failures when writing the first entry + doAnswer(new FakeChannelWriteBBArray()).when(spyCh).write(bbArray); + long chStartPos = ol.getFileChannel().position(); + bb1.clear(); + bb1.put(entry1); + bb2.clear(); + bb2.put(entry2); + bb2.flip(); + ol.flush(bb1, bb2); + + // Write the 2nd entry without forced channel failures + nFakeChannelWrites = 0; + bb1.clear(); + bb1.put(entry2); + bb1 = ol.getWriteBuf(); + ol.flushAll(true); + long chEndPos = ol.getFileChannel().position(); + assertEquals("Change in channel position does not equal the size of the data flushed", + entry1.length + entry2.length, chEndPos - chStartPos); + ByteBuffer dst = ByteBuffer.allocateDirect(entry1.length); + ol.getFileChannel().position(chStartPos); + ol.getFileChannel().read(dst); + verifyBB(dst, entry1); + } finally { + region.destroyRegion(); + } + } + + @Test + public void testChannelRecoversFromWriteFailureOfByteBufferArray() throws Exception { + region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, null); + DiskRegion dr = ((LocalRegion) region).getDiskRegion(); + Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs(); + assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs); + assertNotNull("Unexpected null Oplog", oplogs[0]); + + doChannelBBArrayFlushWithFailures(oplogs, 1 /* write failures */); + } + + @Test + public void testOplogFlushOfByteBufferArrayThrowsIOExceptionWhenNumberOfChannelWriteRetriesExceedsLimit() + throws Exception { + expectedException.expect(instanceOf(IOException.class)); + region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, null); + DiskRegion dr = ((LocalRegion) region).getDiskRegion(); + Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs(); + assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs); + assertNotNull("Unexpected null Oplog", oplogs[0]); + + doChannelBBArrayFlushWithFailures(oplogs, 6 /* exceeds the retry limit in Oplog */); + } +}