hadoop-ozone-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lj...@apache.org
Subject [hadoop-ozone] branch master updated: HDDS-2542. Race condition between read and write stateMachineData. (#310)
Date Thu, 09 Jan 2020 14:23:47 GMT
This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b256aa  HDDS-2542. Race condition between read and write stateMachineData. (#310)
4b256aa is described below

commit 4b256aa473e3bce0884f47985864afe2a9966c6a
Author: Lokesh Jain <ljain@apache.org>
AuthorDate: Thu Jan 9 19:53:35 2020 +0530

    HDDS-2542. Race condition between read and write stateMachineData. (#310)
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   7 -
 .../java/org/apache/hadoop/hdds/utils/Cache.java   |  36 +++++
 .../hadoop/hdds/utils/ResourceLimitCache.java      |  91 +++++++++++
 .../hadoop/hdds/utils/ResourceSemaphore.java       | 170 +++++++++++++++++++++
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   9 --
 .../common/src/main/resources/ozone-default.xml    |   8 -
 .../hadoop/hdds/utils/TestResourceLimitCache.java  |  87 +++++++++++
 .../hadoop/hdds/utils/TestResourceSemaphore.java   |  76 +++++++++
 .../server/ratis/ContainerStateMachine.java        |  62 +++++---
 .../transport/server/ratis/XceiverServerRatis.java |  14 +-
 .../ozone/freon/TestOzoneClientKeyGenerator.java   |  95 ++++++++++++
 11 files changed, 594 insertions(+), 61 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 00d326e..737add0 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -115,13 +115,6 @@ public final class ScmConfigKeys {
   public static final String
       DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT_DEFAULT = "1GB";
 
-  // expiry interval stateMachineData cache entry inside containerStateMachine
-  public static final String
-      DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL =
-      "dfs.container.ratis.statemachine.cache.expiry.interval";
-  public static final String
-      DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT =
-      "10s";
   public static final String DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY =
       "dfs.ratis.client.request.timeout.duration";
   public static final TimeDuration
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Cache.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Cache.java
new file mode 100644
index 0000000..efeb69f
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Cache.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils;
+
+import java.util.function.Predicate;
+
+/**
+ * Cache interface.
+ */
+public interface Cache<K, V> {
+
+  V get(K key);
+
+  V put(K key, V value) throws InterruptedException;
+
+  V remove(K key);
+
+  void removeIf(Predicate<K> predicate);
+
+  void clear();
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceLimitCache.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceLimitCache.java
new file mode 100644
index 0000000..5dda249
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceLimitCache.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils;
+
+
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+
+/**
+ * Cache with resource limit constraints. At any time all entries in the cache
+ * satisfy the resource limit constraints in the constructor. New put
+ * operations are blocked until resources are released via remove or clear
+ * operation.
+ */
+public class ResourceLimitCache<K, V> implements Cache<K, V> {
+  private final java.util.concurrent.ConcurrentMap<K, V> map;
+  private final ResourceSemaphore.Group group;
+  private final BiFunction<K, V, int[]> permitsSupplier;
+
+  public ResourceLimitCache(java.util.concurrent.ConcurrentMap<K, V> map,
+      BiFunction<K, V, int[]> permitsSupplier, int... limits) {
+    Objects.requireNonNull(map);
+    Objects.requireNonNull(permitsSupplier);
+    Objects.requireNonNull(limits);
+    this.map = map;
+    this.group = new ResourceSemaphore.Group(limits);
+    this.permitsSupplier = permitsSupplier;
+  }
+
+  @Override
+  public V get(K key) {
+    Objects.requireNonNull(key);
+    return map.get(key);
+  }
+
+  @Override
+  public V put(K key, V value) throws InterruptedException {
+    Objects.requireNonNull(key);
+    Objects.requireNonNull(value);
+
+    // remove the old key to release the permits
+    V oldVal = remove(key);
+    int[] permits = permitsSupplier.apply(key, value);
+    group.acquire(permits);
+    try {
+      map.put(key, value);
+    } catch (Throwable t) {
+      group.release(permits);
+    }
+    return oldVal;
+  }
+
+  @Override
+  public V remove(K key) {
+    Objects.requireNonNull(key);
+    V val = map.remove(key);
+    if (val != null) {
+      group.release(permitsSupplier.apply(key, val));
+    }
+    return val;
+  }
+
+  @Override
+  public void removeIf(Predicate<K> predicate) {
+    Objects.requireNonNull(predicate);
+    map.keySet().removeIf(predicate);
+  }
+
+  @Override
+  public void clear() {
+    for (K key : map.keySet()) {
+      remove(key);
+    }
+  }
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceSemaphore.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceSemaphore.java
new file mode 100644
index 0000000..96d5996
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/ResourceSemaphore.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils;
+
+
+import org.apache.ratis.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A {@link Semaphore} with a limit for a resource.
+ *
+ * After {@link #close()}, the resource becomes unavailable,
+ * i.e. any acquire will not succeed.
+ */
+public class ResourceSemaphore extends Semaphore {
+  private final int limit;
+  private final AtomicBoolean reducePermits = new AtomicBoolean();
+  private final AtomicBoolean isClosed = new AtomicBoolean();
+
+  public ResourceSemaphore(int limit) {
+    super(limit, true);
+    Preconditions.assertTrue(limit > 0, () -> "limit = " + limit + " <= 0");
+    this.limit = limit;
+  }
+
+  @Override
+  public void release() {
+    release(1);
+  }
+
+  @Override
+  public void release(int permits) {
+    assertRelease(permits);
+    super.release(permits);
+    assertAvailable();
+  }
+
+  private void assertRelease(int toRelease) {
+    Preconditions
+        .assertTrue(toRelease >= 0, () -> "toRelease = " + toRelease + " < 0");
+    final int available = assertAvailable();
+    final int permits = Math.addExact(available, toRelease);
+    Preconditions.assertTrue(permits <= limit,
+        () -> "permits = " + permits + " > limit = " + limit);
+  }
+
+  private int assertAvailable() {
+    final int available = availablePermits();
+    Preconditions
+        .assertTrue(available >= 0, () -> "available = " + available + " < 0");
+    return available;
+  }
+
+  public int used() {
+    return limit - availablePermits();
+  }
+
+  /** Close the resource. */
+  public void close() {
+    if (reducePermits.compareAndSet(false, true)) {
+      reducePermits(limit);
+      isClosed.set(true);
+    }
+  }
+
+  public boolean isClosed() {
+    return isClosed.get();
+  }
+
+  @Override
+  public String toString() {
+    return (isClosed()? "closed/": availablePermits() + "/") + limit;
+  }
+
+  /**
+   * Track a group of resources with a list of {@link ResourceSemaphore}s.
+   */
+  public static class Group {
+    private final List<ResourceSemaphore> resources;
+
+    public Group(int... limits) {
+      final List<ResourceSemaphore> list = new ArrayList<>(limits.length);
+      for(int limit : limits) {
+        list.add(new ResourceSemaphore(limit));
+      }
+      this.resources = Collections.unmodifiableList(list);
+    }
+
+    int resourceSize() {
+      return resources.size();
+    }
+
+    protected ResourceSemaphore get(int i) {
+      return resources.get(i);
+    }
+
+    boolean tryAcquire(int... permits) {
+      Preconditions.assertTrue(permits.length == resources.size(),
+          () -> "items.length = " + permits.length + " != resources.size() = "
+              + resources.size());
+      int i = 0;
+      // try acquiring all resources
+      for (; i < permits.length; i++) {
+        if (!resources.get(i).tryAcquire(permits[i])) {
+          break;
+        }
+      }
+      if (i == permits.length) {
+        return true; // successfully acquired all resources
+      }
+
+      // failed at i, releasing all previous resources
+      for(i--; i >= 0; i--) {
+        resources.get(i).release(permits[i]);
+      }
+      return false;
+    }
+
+    public void acquire(int... permits) throws InterruptedException {
+      Preconditions.assertTrue(permits.length == resources.size(),
+          () -> "items.length = " + permits.length + " != resources.size() = "
+              + resources.size());
+      for (int i = 0; i < permits.length; i++) {
+        resources.get(i).acquire(permits[i]);
+      }
+    }
+
+    protected void release(int... permits) {
+      for(int i = resources.size() - 1; i >= 0; i--) {
+        resources.get(i).release(permits[i]);
+      }
+    }
+
+    public void close() {
+      for(int i = resources.size() - 1; i >= 0; i--) {
+        resources.get(i).close();
+      }
+    }
+
+    public boolean isClosed() {
+      return resources.get(resources.size() - 1).isClosed();
+    }
+
+    @Override
+    public String toString() {
+      return resources + ",size=" + resources.size();
+    }
+  }
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 1700926..e637a09 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -267,15 +267,6 @@ public final class OzoneConfigKeys {
       DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT =
       ScmConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT;
 
-  public static final String
-      DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL =
-      ScmConfigKeys.
-          DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL;
-  public static final String
-      DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT =
-      ScmConfigKeys.
-          DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT;
-
   public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR =
       "dfs.container.ratis.datanode.storage.dir";
   public static final String DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index c83a5bf..8e8ec36 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -253,14 +253,6 @@
     </description>
   </property>
   <property>
-    <name>dfs.container.ratis.statemachine.cache.expiry.interval</name>
-    <value>10s</value>
-    <tag>OZONE, RATIS, PERFORMANCE</tag>
-    <description>The interval till which the stateMachine data in ratis
-      will be cached inside the ContainerStateMachine.
-    </description>
-  </property>
-  <property>
     <name>dfs.ratis.client.request.timeout.duration</name>
     <value>3s</value>
     <tag>OZONE, RATIS, MANAGEMENT</tag>
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceLimitCache.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceLimitCache.java
new file mode 100644
index 0000000..34d6b3c
--- /dev/null
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceLimitCache.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.utils;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Test for ResourceLimitCache.
+ */
+public class TestResourceLimitCache {
+
+  @Test
+  public void testResourceLimitCache()
+      throws InterruptedException, TimeoutException {
+    Cache<Integer, String> resourceCache =
+        new ResourceLimitCache<>(new ConcurrentHashMap<>(),
+            (k, v) -> new int[] {k}, 10);
+    resourceCache.put(6, "a");
+    resourceCache.put(4, "a");
+
+    // put should pass as key 4 will be overwritten
+    resourceCache.put(4, "a");
+
+    // Create a future which blocks to put 1. Currently map has acquired 10
+    // permits out of 10
+    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
+      try {
+        return resourceCache.put(1, "a");
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      return null;
+    });
+    Assert.assertTrue(!future.isDone());
+    Thread.sleep(100);
+    Assert.assertTrue(!future.isDone());
+
+    // remove 4 so that permits are released for key 1 to be put. Currently map
+    // has acquired 6 permits out of 10
+    resourceCache.remove(4);
+
+    GenericTestUtils.waitFor(future::isDone, 100, 1000);
+    // map has the ket 1
+    Assert.assertTrue(future.isDone() && !future.isCompletedExceptionally());
+    Assert.assertNotNull(resourceCache.get(1));
+
+    // Create a future which blocks to put 4. Currently map has acquired 7
+    // permits out of 10
+    future = CompletableFuture.supplyAsync(() -> {
+      try {
+        return resourceCache.put(4, "a");
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      return null;
+    });
+    Assert.assertTrue(!future.isDone());
+    Thread.sleep(100);
+    Assert.assertTrue(!future.isDone());
+
+    // Cancel the future for putting key 4
+    future.cancel(true);
+    // remove key 1 so currently map has acquired 6 permits out of 10
+    resourceCache.remove(1);
+    Assert.assertNull(resourceCache.get(4));
+  }
+}
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceSemaphore.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceSemaphore.java
new file mode 100644
index 0000000..cbdd558
--- /dev/null
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestResourceSemaphore.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for ResourceSemaphore.
+ */
+public class TestResourceSemaphore {
+  @Test(timeout = 1000)
+  public void testGroup() {
+    final ResourceSemaphore.Group g = new ResourceSemaphore.Group(3, 1);
+
+    assertUsed(g, 0, 0);
+    assertAcquire(g, true, 1, 1);
+    assertUsed(g, 1, 1);
+    assertAcquire(g, false, 1, 1);
+    assertUsed(g, 1, 1);
+    assertAcquire(g, false, 0, 1);
+    assertUsed(g, 1, 1);
+    assertAcquire(g, true, 1, 0);
+    assertUsed(g, 2, 1);
+    assertAcquire(g, true, 1, 0);
+    assertUsed(g, 3, 1);
+    assertAcquire(g, false, 1, 0);
+    assertUsed(g, 3, 1);
+
+    g.release(1, 1);
+    assertUsed(g, 2, 0);
+    g.release(2, 0);
+    assertUsed(g, 0, 0);
+    g.release(0, 0);
+    assertUsed(g, 0, 0);
+
+    try {
+      g.release(1, 0);
+      Assert.fail("Should have failed.");
+    } catch (IllegalStateException e){
+    }
+    try {
+      g.release(0, 1);
+      Assert.fail("Should have failed.");
+    } catch (IllegalStateException e){
+    }
+  }
+
+  static void assertUsed(ResourceSemaphore.Group g, int... expected) {
+    Assert.assertEquals(expected.length, g.resourceSize());
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertEquals(expected[i], g.get(i).used());
+    }
+  }
+
+  static void assertAcquire(ResourceSemaphore.Group g, boolean expected,
+      int... permits) {
+    final boolean computed = g.tryAcquire(permits);
+    Assert.assertEquals(expected, computed);
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 4aacedc..dbf376f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -20,9 +20,8 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 
@@ -30,6 +29,8 @@ import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.utils.Cache;
+import org.apache.hadoop.hdds.utils.ResourceLimitCache;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.util.Time;
@@ -75,7 +76,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -83,8 +83,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.concurrent.Executors;
@@ -158,7 +156,7 @@ public class ContainerStateMachine extends BaseStateMachine {
   @SuppressWarnings("parameternumber")
   public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
       ContainerController containerController, ThreadPoolExecutor chunkExecutor,
-      XceiverServerRatis ratisServer, long expiryInterval, Configuration conf) {
+      XceiverServerRatis ratisServer, Configuration conf) {
     this.gid = gid;
     this.dispatcher = dispatcher;
     this.containerController = containerController;
@@ -167,11 +165,17 @@ public class ContainerStateMachine extends BaseStateMachine {
     metrics = CSMMetrics.create(gid);
     this.writeChunkFutureMap = new ConcurrentHashMap<>();
     applyTransactionCompletionMap = new ConcurrentHashMap<>();
-    stateMachineDataCache = CacheBuilder.newBuilder()
-        .expireAfterAccess(expiryInterval, TimeUnit.MILLISECONDS)
-        // set the limit on no of cached entries equal to no of max threads
-        // executing writeStateMachineData
-        .maximumSize(chunkExecutor.getCorePoolSize()).build();
+    int numPendingRequests = conf.getInt(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_LEADER_NUM_PENDING_REQUESTS,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_LEADER_NUM_PENDING_REQUESTS_DEFAULT
+    );
+    int pendingRequestsByteLimit = (int) conf.getStorageSize(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT_DEFAULT,
+        StorageUnit.BYTES);
+    stateMachineDataCache = new ResourceLimitCache<>(new ConcurrentHashMap<>(),
+        (index, data) -> new int[] {1, data.size()}, numPendingRequests,
+        pendingRequestsByteLimit);
     this.container2BCSIDMap = new ConcurrentHashMap<>();
 
     final int numContainerOpExecutors = conf.getInt(
@@ -417,7 +421,7 @@ public class ContainerStateMachine extends BaseStateMachine {
       if (((RaftServerProxy) server).getImpl(gid).isLeader()) {
         stateMachineDataCache.put(entryIndex, write.getData());
       }
-    } catch (IOException ioe) {
+    } catch (IOException | InterruptedException ioe) {
       return completeExceptionally(ioe);
     }
     DispatcherContext context =
@@ -466,6 +470,9 @@ public class ContainerStateMachine extends BaseStateMachine {
             write.getChunkData().getChunkName() + " Error message: " +
             r.getMessage() + " Container Result: " + r.getResult());
         metrics.incNumWriteDataFails();
+        // If the write fails currently we mark the stateMachine as unhealthy.
+        // This leads to pipeline close. Any change in that behavior requires
+        // handling the entry for the write chunk in cache.
         stateMachineHealthy.set(false);
         raftFuture.completeExceptionally(sce);
       } else {
@@ -589,9 +596,13 @@ public class ContainerStateMachine extends BaseStateMachine {
    * Reads the Entry from the Cache or loads it back by reading from disk.
    */
   private ByteString getCachedStateMachineData(Long logIndex, long term,
-      ContainerCommandRequestProto requestProto) throws ExecutionException {
-    return stateMachineDataCache.get(logIndex,
-        () -> readStateMachineData(requestProto, term, logIndex));
+      ContainerCommandRequestProto requestProto)
+      throws IOException {
+    ByteString data = stateMachineDataCache.get(logIndex);
+    if (data == null) {
+      data = readStateMachineData(requestProto, term, logIndex);
+    }
+    return data;
   }
 
   /**
@@ -637,7 +648,7 @@ public class ContainerStateMachine extends BaseStateMachine {
             future.complete(
                 getCachedStateMachineData(entry.getIndex(), entry.getTerm(),
                     requestProto));
-          } catch (ExecutionException e) {
+          } catch (IOException e) {
             metrics.incNumReadStateMachineFails();
             future.completeExceptionally(e);
           }
@@ -695,6 +706,10 @@ public class ContainerStateMachine extends BaseStateMachine {
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     long index = trx.getLogEntry().getIndex();
+    // Since leader and one of the followers has written the data, it can
+    // be removed from the stateMachineDataMap.
+    stateMachineDataCache.remove(index);
+
     DispatcherContext.Builder builder =
         new DispatcherContext.Builder()
             .setTerm(trx.getLogEntry().getTerm())
@@ -803,10 +818,15 @@ public class ContainerStateMachine extends BaseStateMachine {
     return future;
   }
 
+  @Override
+  public CompletableFuture<Void> truncateStateMachineData(long index) {
+    stateMachineDataCache.removeIf(k -> k >= index);
+    return CompletableFuture.completedFuture(null);
+  }
+
   @VisibleForTesting
   public void evictStateMachineCache() {
-    stateMachineDataCache.invalidateAll();
-    stateMachineDataCache.cleanUp();
+    stateMachineDataCache.clear();
   }
 
   @Override
@@ -820,12 +840,6 @@ public class ContainerStateMachine extends BaseStateMachine {
   }
 
   @Override
-  public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
-      throws IOException {
-    evictStateMachineCache();
-  }
-
-  @Override
   public void notifyLogFailed(Throwable t, LogEntryProto failedEntry) {
     ratisServer.handleNodeLogFailure(gid, t);
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 753d8f5..17a3892 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -57,7 +57,6 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
-import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.util.SizeInBytes;
@@ -102,9 +101,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   private final ContainerController containerController;
   private ClientId clientId = ClientId.randomId();
   private final StateContext context;
-  private final ReplicationLevel replicationLevel;
   private long nodeFailureTimeoutMs;
-  private final long cacheEntryExpiryInteval;
   private boolean isStarted = false;
   private DatanodeDetails datanodeDetails;
   private final OzoneConfiguration conf;
@@ -138,14 +135,6 @@ public final class XceiverServerRatis implements XceiverServerSpi {
             new ArrayBlockingQueue<>(queueLimit),
             new ThreadPoolExecutor.CallerRunsPolicy());
     this.context = context;
-    this.replicationLevel =
-        conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY,
-            OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT);
-    cacheEntryExpiryInteval = conf.getTimeDuration(OzoneConfigKeys.
-            DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL,
-        OzoneConfigKeys.
-            DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL_DEFAULT,
-        TimeUnit.MILLISECONDS);
     this.dispatcher = dispatcher;
     this.containerController = containerController;
     this.raftPeerId = RatisHelper.toRaftPeerId(dd);
@@ -162,8 +151,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
 
   private ContainerStateMachine getStateMachine(RaftGroupId gid) {
     return new ContainerStateMachine(gid, dispatcher, containerController,
-        chunkExecutor, this, cacheEntryExpiryInteval,
-        conf);
+        chunkExecutor, this, conf);
   }
 
   private RaftProperties newRaftProperties() {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java
new file mode 100644
index 0000000..315d1ee
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.event.Level;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/**
+ * Test for OzoneClientKeyGenerator.
+ */
+public class TestOzoneClientKeyGenerator {
+
+  private String path;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   *
+   * @throws IOException
+   */
+  @Before
+  public void setup() {
+    path = GenericTestUtils
+        .getTempPath(TestOzoneClientKeyGenerator.class.getSimpleName());
+    GenericTestUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    File baseDir = new File(path);
+    baseDir.mkdirs();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  private void shutdown(MiniOzoneCluster cluster) throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+      FileUtils.deleteDirectory(new File(path));
+    }
+  }
+
+  private MiniOzoneCluster startCluster(OzoneConfiguration conf)
+      throws Exception {
+    if (conf == null) {
+      conf = new OzoneConfiguration();
+    }
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
+    MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(5)
+        .build();
+
+    cluster.waitForClusterToBeReady();
+    cluster.waitTobeOutOfSafeMode();
+    return cluster;
+  }
+
+  @Test
+  public void testOzoneClientKeyGenerator() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    MiniOzoneCluster cluster = startCluster(conf);
+    FileOutputStream out = FileUtils.openOutputStream(new File(path, "conf"));
+    cluster.getConf().writeXml(out);
+    out.getFD().sync();
+    out.close();
+    new Freon().execute(
+        new String[] {"-conf", new File(path, "conf").getAbsolutePath(),
+            "ockg", "-t", "1"});
+    shutdown(cluster);
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org


Mime
View raw message