geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kh...@apache.org
Subject geode git commit: GEODE-2398: Retry oplog channel.write on silent failures
Date Thu, 09 Feb 2017 18:05:35 GMT
Repository: geode
Updated Branches:
  refs/heads/feature/GEODE-2398 [created] 451c5b497


GEODE-2398: Retry oplog channel.write on silent failures

Implemented limited retries in two forms of Oplog.flush() when channel.write() is called.
If write() returns bytes witten less than the change in the ByteBuffer positions, then reset
buffer positions and re-try writing for a liomited number of times. Throws
IOException if the write doesn't succeeded after a few retries (max
number of retries is defined by a static).

Added new unit tests.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/451c5b49
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/451c5b49
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/451c5b49

Branch: refs/heads/feature/GEODE-2398
Commit: 451c5b497662485bfb94b7f7afaacfd2cd82d043
Parents: 0d0c7c4
Author: Ken Howe <khowe@pivotal.io>
Authored: Wed Feb 1 09:16:40 2017 -0800
Committer: Ken Howe <khowe@pivotal.io>
Committed: Thu Feb 9 09:29:25 2017 -0800

----------------------------------------------------------------------
 .../org/apache/geode/internal/cache/Oplog.java  |  66 ++++-
 .../geode/internal/cache/OplogFlushTest.java    | 258 +++++++++++++++++++
 2 files changed, 321 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/451c5b49/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
index 0b98364..4e426a0 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
@@ -5186,7 +5186,14 @@ public final class Oplog implements CompactableOplog, Flushable {
     // flush(olf, true);
   }
 
+  private static final int MAX_CHANNEL_RETRIES = 5;
+
   private final void flush(OplogFile olf, boolean doSync) throws IOException {
+    int flushed;
+    int channelBytesWritten;
+    int numChannelRetries = 0;
+    int bbStartPos;
+    long channelStartPos;
     try {
       synchronized (this.lock/* olf */) {
         if (olf.RAFClosed) {
@@ -5195,9 +5202,27 @@ public final class Oplog implements CompactableOplog, Flushable {
         ByteBuffer bb = olf.writeBuf;
         if (bb != null && bb.position() != 0) {
           bb.flip();
-          int flushed = 0;
+          flushed = 0;
           do {
-            flushed += olf.channel.write(bb);
+            channelBytesWritten = 0;
+            bbStartPos = bb.position();
+            channelStartPos = olf.channel.position();
+            // differentiate between bytes written on this channel.write() iteration and
the
+            // total number of bytes written to the channel on this call
+            channelBytesWritten = olf.channel.write(bb);
+            flushed += channelBytesWritten;
+            // Expect channelBytesWritten and the changes in pp.position() and channel.position()
to
+            // be the same. If they are not, then the channel.write() silently failed. The
following
+            // retry separates spurious failures from permanent channel failures.
+            if (channelBytesWritten != bb.position() - bbStartPos) {
+              if (numChannelRetries++ < MAX_CHANNEL_RETRIES) {
+                // Reset the ByteBuffer position, but take into account anything that did
get
+                // written to the channel
+                bb.position(bbStartPos + (int) (olf.channel.position() - channelStartPos));
+              } else {
+                throw new IOException("Failed to write Oplog entry to" + olf.f.getName());
+              }
+            }
           } while (bb.hasRemaining());
           // update bytesFlushed after entire writeBuffer is flushed to fix bug
           // 41201
@@ -5222,6 +5247,11 @@ public final class Oplog implements CompactableOplog, Flushable {
 
   private final void flush(OplogFile olf, ByteBuffer b1, ByteBuffer b2) throws IOException
{
     try {
+      long channelStartPos;
+      long expectedWritten;
+      long flushed;
+      int numChannelRetries = 0;
+      boolean retryWrite = false;
       synchronized (this.lock/* olf */) {
         if (olf.RAFClosed) {
           return;
@@ -5229,7 +5259,25 @@ public final class Oplog implements CompactableOplog, Flushable {
         this.bbArray[0] = b1;
         this.bbArray[1] = b2;
         b1.flip();
-        long flushed = olf.channel.write(this.bbArray);
+        int b1StartPos = b1.position();
+        int b2StartPos = b2.position();
+        expectedWritten = b1.limit() - b1StartPos + b2.limit() - b2StartPos;
+        channelStartPos = olf.channel.position();
+
+        do {
+          retryWrite = false;
+          flushed = olf.channel.write(this.bbArray);
+          if (flushed != expectedWritten) {
+            if (numChannelRetries++ < MAX_CHANNEL_RETRIES) {
+              retryWrite = true;
+              olf.channel.position(channelStartPos);
+              b1.position(b1StartPos);
+              b2.position(b2StartPos);
+            } else {
+              throw new IOException("Failed to write Oplog entry to" + olf.f.getName());
+            }
+          }
+        } while (retryWrite);
         this.bbArray[0] = null;
         this.bbArray[1] = null;
         // update bytesFlushed after entire writeBuffer is flushed to fix bug 41201
@@ -6382,6 +6430,18 @@ public final class Oplog implements CompactableOplog, Flushable {
     return "oplog#" + getOplogId() /* + "DEBUG" + System.identityHashCode(this) */;
   }
 
+  /**
+   * Method to be used only for testing
+   * 
+   * @param ch Object to replace the channel in the Oplog.crf
+   * @return original channel object
+   */
+  UninterruptibleFileChannel testSetCrfChannel(UninterruptibleFileChannel ch) {
+    UninterruptibleFileChannel chPrev = this.crf.channel;
+    this.crf.channel = ch;
+    return chPrev;
+  }
+
   // //////// Methods used during recovery //////////////
 
   // ////////////////////Inner Classes //////////////////////

http://git-wip-us.apache.org/repos/asf/geode/blob/451c5b49/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
new file mode 100644
index 0000000..1d484e4
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.geode.cache.DiskAccessException;
+import org.apache.geode.internal.cache.persistence.UninterruptibleFileChannel;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+/**
+ * Testing recovery from failures writing Oplog entries
+ */
+@Category(IntegrationTest.class)
+public class OplogFlushTest extends DiskRegionTestingBase {
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  // How many times to fake the write failures
+  private int nFakeChannelWrites = 0;
+  private Oplog ol = null;
+  private ByteBuffer bb1 = null;
+  private ByteBuffer bb2 = null;
+  private ByteBuffer[] bbArray = new ByteBuffer[2];
+  private UninterruptibleFileChannel ch;
+  private UninterruptibleFileChannel spyCh;
+
+  @Rule
+  public TestName name = new TestName();
+
+  class FakeChannelWriteBB implements Answer<Integer> {
+
+    @Override
+    public Integer answer(InvocationOnMock invocation) throws Throwable {
+      return fakeWriteBB(ol, bb1);
+    }
+  }
+
+  private int fakeWriteBB(Oplog ol, ByteBuffer bb) throws IOException {
+    if (nFakeChannelWrites > 0) {
+      bb.position(bb.limit());
+      --nFakeChannelWrites;
+      return 0;
+    }
+    doCallRealMethod().when(spyCh).write(bb);
+    return spyCh.write(bb);
+  }
+
+  private void verifyBB(ByteBuffer bb, byte[] src) {
+    bb.flip();
+    for (int i = 0; i < src.length; ++i) {
+      assertEquals("Channel contents does not match expected at index " + i, src[i], bb.get());
+    }
+  }
+
+  private void doChannelFlushWithFailures(Oplog[] oplogs, int numFailures) throws IOException
{
+    nFakeChannelWrites = numFailures;
+    ol = oplogs[0];
+    ch = ol.getFileChannel();
+    spyCh = spy(ch);
+    ol.testSetCrfChannel(spyCh);
+
+    byte[] entry1 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19};
+    byte[] entry2 = {100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113,
114, 115,
+        116, 117, 118, 119};
+
+    bb1 = ol.getWriteBuf();
+    try {
+      // Force channel.write() failures when writing the first entry
+      doAnswer(new FakeChannelWriteBB()).when(spyCh).write(bb1);
+      long chStartPos = ol.getFileChannel().position();
+      bb1.clear();
+      bb1.put(entry1);
+      ol.flushAll(true);
+
+      // Write the 2nd entry without forced channel failures
+      nFakeChannelWrites = 0;
+      bb1 = ol.getWriteBuf();
+      bb1.clear();
+      bb1.put(entry2);
+      ol.flushAll(true);
+      long chEndPos = ol.getFileChannel().position();
+      assertEquals("Change in channel position does not equal the size of the data flushed",
+          entry1.length + entry2.length, chEndPos - chStartPos);
+      ByteBuffer dst = ByteBuffer.allocateDirect(entry1.length);
+      ol.getFileChannel().position(chStartPos);
+      ol.getFileChannel().read(dst);
+      verifyBB(dst, entry1);
+    } finally {
+      region.destroyRegion();
+    }
+  }
+
+  @Test
+  public void testAsyncChannelWriteRetriesOnFailureDuringFlush() throws Exception {
+    region = DiskRegionHelperFactory.getAsyncOverFlowAndPersistRegion(cache, null);
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
+    assertNotNull("Unexpected null Oplog", oplogs[0]);
+
+    doChannelFlushWithFailures(oplogs, 1 /* write failure */);
+  }
+
+  @Test
+  public void testChannelWriteRetriesOnFailureDuringFlush() throws Exception {
+    region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, null);
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
+    assertNotNull("Unexpected null Oplog", oplogs[0]);
+
+    doChannelFlushWithFailures(oplogs, 1 /* write failure */);
+  }
+
+  @Test
+  public void testChannelRecoversFromWriteFailureRepeatedRetriesDuringFlush() throws Exception
{
+    region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, null);
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
+    assertNotNull("Unexpected null Oplog", oplogs[0]);
+
+    doChannelFlushWithFailures(oplogs, 3 /* write failures */);
+  }
+
+  @Test
+  public void testOplogFlushThrowsIOExceptioniWhenNumberOfChannelWriteRetriesExceedsLimit()
+      throws Exception {
+    expectedException.expect(DiskAccessException.class);
+    expectedException.expectCause(instanceOf(IOException.class));
+    region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, null);
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
+    assertNotNull("Unexpected null Oplog", oplogs[0]);
+
+    doChannelFlushWithFailures(oplogs, 6 /* exceeds the retry limit in Oplog */);
+  }
+
+  class FakeChannelWriteBBArray implements Answer<Long> {
+
+    @Override
+    public Long answer(InvocationOnMock invocation) throws Throwable {
+      bbArray = ol.bbArray;
+      return fakeWriteBBArray(ol, bbArray);
+    }
+  }
+
+  private long fakeWriteBBArray(Oplog ol, ByteBuffer[] bbA) throws IOException {
+    if (nFakeChannelWrites > 0) {
+      for (int i = 0; i < bbA.length; ++i) {
+        bbA[i].position(bbA[i].limit());
+      }
+      --nFakeChannelWrites;
+      return 0;
+    }
+    doCallRealMethod().when(spyCh).write(bbA);
+    return spyCh.write(bbA);
+  }
+
+  private void doChannelBBArrayFlushWithFailures(Oplog[] oplogs, int numFailures)
+      throws IOException {
+    nFakeChannelWrites = numFailures;
+    ol = oplogs[0];
+    ch = ol.getFileChannel();
+    spyCh = spy(ch);
+    ol.testSetCrfChannel(spyCh);
+
+    byte[] entry1 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
+    byte[] entry2 = {16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31};
+    byte[] entry3 = {32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47};
+
+    bb1 = ByteBuffer.allocateDirect(entry1.length);
+    bb2 = ByteBuffer.allocateDirect(entry2.length);
+    ByteBuffer[] bbArray = ol.bbArray;
+    try {
+      // Force channel.write() failures when writing the first entry
+      doAnswer(new FakeChannelWriteBBArray()).when(spyCh).write(bbArray);
+      long chStartPos = ol.getFileChannel().position();
+      bb1.clear();
+      bb1.put(entry1);
+      bb2.clear();
+      bb2.put(entry2);
+      bb2.flip();
+      ol.flush(bb1, bb2);
+
+      // Write the 2nd entry without forced channel failures
+      nFakeChannelWrites = 0;
+      bb1.clear();
+      bb1.put(entry2);
+      bb1 = ol.getWriteBuf();
+      ol.flushAll(true);
+      long chEndPos = ol.getFileChannel().position();
+      assertEquals("Change in channel position does not equal the size of the data flushed",
+          entry1.length + entry2.length, chEndPos - chStartPos);
+      ByteBuffer dst = ByteBuffer.allocateDirect(entry1.length);
+      ol.getFileChannel().position(chStartPos);
+      ol.getFileChannel().read(dst);
+      verifyBB(dst, entry1);
+    } finally {
+      region.destroyRegion();
+    }
+  }
+
+  @Test
+  public void testChannelRecoversFromWriteFailureOfByteBufferArray() throws Exception {
+    region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, null);
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
+    assertNotNull("Unexpected null Oplog", oplogs[0]);
+
+    doChannelBBArrayFlushWithFailures(oplogs, 1 /* write failures */);
+  }
+
+  @Test
+  public void testOplogFlushOfByteBufferArrayThrowsIOExceptionWhenNumberOfChannelWriteRetriesExceedsLimit()
+      throws Exception {
+    expectedException.expect(instanceOf(IOException.class));
+    region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, null);
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
+    assertNotNull("Unexpected null Oplog", oplogs[0]);
+
+    doChannelBBArrayFlushWithFailures(oplogs, 6 /* exceeds the retry limit in Oplog */);
+  }
+}


Mime
View raw message