hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject hadoop git commit: HDFS-11887. Shared XceiverClient should be closed if there is no open clients to avoid resource leak. Contributed by Mukul Kumar Singh.
Date Tue, 06 Jun 2017 13:21:04 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 b8f471386 -> 245c6fed9


HDFS-11887. Shared XceiverClient should be closed if there is no open clients to avoid resource
leak. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/245c6fed
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/245c6fed
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/245c6fed

Branch: refs/heads/HDFS-7240
Commit: 245c6fed97a8dc7979a68f6de52aaf9cc812287e
Parents: b8f4713
Author: Weiwei Yang <wwei@apache.org>
Authored: Tue Jun 6 21:20:20 2017 +0800
Committer: Weiwei Yang <wwei@apache.org>
Committed: Tue Jun 6 21:20:20 2017 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/scm/ScmConfigKeys.java    |   5 +
 .../org/apache/hadoop/scm/XceiverClient.java    |   3 +-
 .../apache/hadoop/scm/XceiverClientManager.java | 150 ++++++--------
 .../apache/hadoop/scm/XceiverClientRatis.java   |   3 +-
 .../org/apache/hadoop/scm/XceiverClientSpi.java |  55 +++++-
 .../cache/impl/AsyncBlockWriter.java            |   6 +-
 .../apache/hadoop/cblock/TestBufferManager.java |   1 +
 .../hadoop/cblock/TestLocalBlockCache.java      |   1 +
 .../ozone/scm/TestContainerSmallFile.java       |   3 +
 .../ozone/scm/TestXceiverClientManager.java     | 194 +++++++++++++++++++
 10 files changed, 324 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/245c6fed/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index 9553171..d343254 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -35,6 +35,11 @@ public final class ScmConfigKeys {
   public static final int SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT =
       10000;
 
+  public static final String SCM_CONTAINER_CLIENT_MAX_SIZE_KEY =
+      "scm.container.client.max.size";
+  public static final int SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT =
+      256;
+
   public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
       = "dfs.container.ratis.enabled";
   public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT

http://git-wip-us.apache.org/repos/asf/hadoop/blob/245c6fed/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
index fc9092a..eb61709 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
@@ -40,7 +40,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * A Client for the storageContainer protocol.
  */
-public class XceiverClient implements XceiverClientSpi {
+public class XceiverClient extends XceiverClientSpi {
   static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
   private final Pipeline pipeline;
   private final Configuration config;
@@ -55,6 +55,7 @@ public class XceiverClient implements XceiverClientSpi {
    * @param config -- Ozone Config
    */
   public XceiverClient(Pipeline pipeline, Configuration config) {
+    super();
     Preconditions.checkNotNull(pipeline);
     Preconditions.checkNotNull(config);
     this.pipeline = pipeline;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/245c6fed/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
index 637268b..a837348 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
@@ -20,8 +20,9 @@ package org.apache.hadoop.scm;
 
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Callable;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import com.google.common.cache.Cache;
@@ -31,8 +32,14 @@ import com.google.common.cache.RemovalNotification;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
-import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT;
-import static org.apache.hadoop.scm.ScmConfigKeys.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .SCM_CONTAINER_CLIENT_MAX_SIZE_KEY;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT;
 
 /**
  * XceiverClientManager is responsible for the lifecycle of XceiverClient
@@ -50,8 +57,7 @@ public class XceiverClientManager {
 
   //TODO : change this to SCM configuration class
   private final Configuration conf;
-  private Cache<String, XceiverClientWithAccessInfo> openClient;
-  private final long staleThresholdMs;
+  private final Cache<String, XceiverClientSpi> clientCache;
   private final boolean useRatis;
 
   /**
@@ -61,121 +67,91 @@ public class XceiverClientManager {
    */
   public XceiverClientManager(Configuration conf) {
     Preconditions.checkNotNull(conf);
-    this.staleThresholdMs = conf.getTimeDuration(
+    int maxSize = conf.getInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY,
+        SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT);
+    long staleThresholdMs = conf.getTimeDuration(
         SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY,
         SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS);
     this.useRatis = conf.getBoolean(
         ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
         ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
     this.conf = conf;
-    this.openClient = CacheBuilder.newBuilder()
-        .expireAfterAccess(this.staleThresholdMs, TimeUnit.MILLISECONDS)
+    this.clientCache = CacheBuilder.newBuilder()
+        .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
+        .maximumSize(maxSize)
         .removalListener(
-            new RemovalListener<String, XceiverClientWithAccessInfo>() {
+            new RemovalListener<String, XceiverClientSpi>() {
             @Override
             public void onRemoval(
-                RemovalNotification<String, XceiverClientWithAccessInfo>
+                RemovalNotification<String, XceiverClientSpi>
                   removalNotification) {
-              // If the reference count is not 0, this xceiver client should not
-              // be evicted, add it back to the cache.
-              XceiverClientWithAccessInfo info = removalNotification.getValue();
-              if (info.hasRefence()) {
-                synchronized (XceiverClientManager.this.openClient) {
-                  XceiverClientManager.this
-                      .openClient.put(removalNotification.getKey(), info);
-                }
+              synchronized (clientCache) {
+                // Mark the entry as evicted
+                XceiverClientSpi info = removalNotification.getValue();
+                info.setEvicted();
               }
             }
           }).build();
   }
 
+  @VisibleForTesting
+  public Cache<String, XceiverClientSpi> getClientCache() {
+    return clientCache;
+  }
+
   /**
-   * Acquires a XceiverClient connected to a container capable of storing the
-   * specified key.
+   * Acquires a SharedXceiverClient connected to a container capable of
+   * storing the specified key.
    *
-   * If there is already a cached XceiverClient, simply return the cached
-   * otherwise create a new one.
+   * If there is already a cached SharedXceiverClient, simply return
+   * the cached otherwise create a new one.
    *
    * @param pipeline the container pipeline for the client connection
-   * @return XceiverClient connected to a container
-   * @throws IOException if an XceiverClient cannot be acquired
+   * @return SharedXceiverClient connected to a container
+   * @throws IOException if an SharedXceiverClient cannot be acquired
    */
-  public XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException {
+  public XceiverClientSpi acquireClient(Pipeline pipeline)
+      throws IOException {
     Preconditions.checkNotNull(pipeline);
     Preconditions.checkArgument(pipeline.getMachines() != null);
     Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
-    String containerName = pipeline.getContainerName();
-    synchronized(openClient) {
-      XceiverClientWithAccessInfo info =
-          openClient.getIfPresent(containerName);
 
-      if (info != null) {
-        // we do have this connection, add reference and return
-        info.incrementReference();
-        return info.getXceiverClient();
-      } else {
-        // connection not found, create new, add reference and return
-        final XceiverClientSpi xceiverClient = useRatis ?
-            XceiverClientRatis.newXceiverClientRatis(pipeline, conf)
-            : new XceiverClient(pipeline, conf);
-        try {
-          xceiverClient.connect();
-        } catch (Exception e) {
-          throw new IOException("Exception connecting XceiverClient.", e);
-        }
-        info = new XceiverClientWithAccessInfo(xceiverClient);
-        info.incrementReference();
-        openClient.put(containerName, info);
-        return xceiverClient;
-      }
+    synchronized (clientCache) {
+      XceiverClientSpi info = getClient(pipeline);
+      info.incrementReference();
+      return info;
     }
   }
 
   /**
-   * Releases an XceiverClient after use.
+   * Releases an SharedXceiverClient after use.
    *
-   * @param xceiverClient client to release
+   * @param client client to release
    */
-  public void releaseClient(XceiverClientSpi xceiverClient) {
-    Preconditions.checkNotNull(xceiverClient);
-    String containerName = xceiverClient.getPipeline().getContainerName();
-    XceiverClientWithAccessInfo info;
-    synchronized (openClient) {
-      info = openClient.getIfPresent(containerName);
-      Preconditions.checkNotNull(info);
-      info.decrementReference();
+  public void releaseClient(XceiverClientSpi client) {
+    Preconditions.checkNotNull(client);
+    synchronized (clientCache) {
+      client.decrementReference();
     }
   }
 
-  /**
-   * A helper class for caching and cleaning XceiverClient. Three parameters:
-   * - the actual XceiverClient object
-   * - a time stamp representing the most recent access (acquire or release)
-   * - a reference count, +1 when acquire, -1 when release
-   */
-  private static class XceiverClientWithAccessInfo {
-    final private XceiverClientSpi xceiverClient;
-    final private AtomicInteger referenceCount;
-
-    XceiverClientWithAccessInfo(XceiverClientSpi xceiverClient) {
-      this.xceiverClient = xceiverClient;
-      this.referenceCount = new AtomicInteger(0);
-    }
-
-    void incrementReference() {
-      this.referenceCount.incrementAndGet();
-    }
-
-    void decrementReference() {
-      this.referenceCount.decrementAndGet();
-    }
-
-    boolean hasRefence() {
-      return this.referenceCount.get() != 0;
-    }
-
-    XceiverClientSpi getXceiverClient() {
-      return xceiverClient;
+  private XceiverClientSpi getClient(Pipeline pipeline)
+      throws IOException {
+    String containerName = pipeline.getContainerName();
+    try {
+      return clientCache.get(containerName,
+          new Callable<XceiverClientSpi>() {
+          @Override
+          public XceiverClientSpi call() throws Exception {
+            XceiverClientSpi client = useRatis ?
+                    XceiverClientRatis.newXceiverClientRatis(pipeline, conf)
+                    : new XceiverClient(pipeline, conf);
+            client.connect();
+            return client;
+          }
+        });
+    } catch (Exception e) {
+      throw new IOException("Exception getting XceiverClient.", e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/245c6fed/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
index a0ad24e..a1ef114 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
@@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicReference;
  * An abstract implementation of {@link XceiverClientSpi} using Ratis.
  * The underlying RPC mechanism can be chosen via the constructor.
  */
-public final  class XceiverClientRatis implements XceiverClientSpi {
+public final class XceiverClientRatis extends XceiverClientSpi {
   static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class);
 
   public static XceiverClientRatis newXceiverClientRatis(
@@ -58,6 +58,7 @@ public final  class XceiverClientRatis implements XceiverClientSpi {
 
   /** Constructs a client. */
   private XceiverClientRatis(Pipeline pipeline, RpcType rpcType) {
+    super();
     this.pipeline = pipeline;
     this.rpcType = rpcType;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/245c6fed/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java
index 1cf5a28..b48fed0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java
@@ -18,24 +18,65 @@
 
 package org.apache.hadoop.scm;
 
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.ozone.protocol.proto
+    .ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto
+    .ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * A Client for the storageContainer protocol.
  */
-public interface XceiverClientSpi extends Closeable {
+public abstract class XceiverClientSpi implements Closeable {
+
+  final private AtomicInteger referenceCount;
+  private boolean isEvicted;
+
+  XceiverClientSpi() {
+    this.referenceCount = new AtomicInteger(0);
+    this.isEvicted = false;
+  }
+
+  void incrementReference() {
+    this.referenceCount.incrementAndGet();
+  }
+
+  void decrementReference() {
+    this.referenceCount.decrementAndGet();
+    cleanup();
+  }
+
+  void setEvicted() {
+    isEvicted = true;
+    cleanup();
+  }
+
+  // close the xceiverClient only if,
+  // 1) there is no refcount on the client
+  // 2) it has been evicted from the cache.
+  private void cleanup() {
+    if (referenceCount.get() == 0 && isEvicted) {
+      close();
+    }
+  }
+
+  @VisibleForTesting
+  public int getRefcount() {
+    return referenceCount.get();
+  }
+
   /**
    * Connects to the leader in the pipeline.
    */
-  void connect() throws Exception;
+  public abstract void connect() throws Exception;
 
   @Override
-  void close();
+  public abstract void close();
 
   /**
    * Returns the pipeline of machines that host the container used by this
@@ -43,7 +84,7 @@ public interface XceiverClientSpi extends Closeable {
    *
    * @return pipeline of machines that host the container
    */
-  Pipeline getPipeline();
+  public abstract Pipeline getPipeline();
 
   /**
    * Sends a given command to server and gets the reply back.
@@ -51,6 +92,6 @@ public interface XceiverClientSpi extends Closeable {
    * @return Response to the command
    * @throws IOException
    */
-  ContainerCommandResponseProto sendCommand(
+  public abstract ContainerCommandResponseProto sendCommand(
       ContainerCommandRequestProto request) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/245c6fed/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
index 73e17dd..30817c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
@@ -169,9 +169,10 @@ public class AsyncBlockWriter {
     } else {
       Pipeline pipeline = parentCache.getPipeline(block.getBlockID());
       String containerName = pipeline.getContainerName();
+      XceiverClientSpi client = null;
       try {
         long startTime = Time.monotonicNow();
-        XceiverClientSpi client = parentCache.getClientManager()
+        client = parentCache.getClientManager()
             .acquireClient(parentCache.getPipeline(block.getBlockID()));
         // BUG: fix the trace ID.
         ContainerProtocolCalls.writeSmallFile(client, containerName,
@@ -192,6 +193,9 @@ public class AsyncBlockWriter {
             block.getBlockID(), containerName, ex);
         throw ex;
       } finally {
+        if (client != null) {
+          parentCache.getClientManager().releaseClient(client);
+        }
         block.clearData();
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/245c6fed/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
index ec188dd..e38ca71 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java
@@ -112,6 +112,7 @@ public class TestBufferManager {
       // read the list from CBlockServer. So we mimic that action here.
       pipeline.setData(Longs.toByteArray(x));
       containerPipelines.add(pipeline);
+      xceiverClientManager.releaseClient(client);
     }
     return containerPipelines;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/245c6fed/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
index c54a214..eec417a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java
@@ -123,6 +123,7 @@ public class TestLocalBlockCache {
       // read the list from CBlockServer. So we mimic that action here.
       pipeline.setData(Longs.toByteArray(x));
       containerPipelines.add(pipeline);
+      xceiverClientManager.releaseClient(client);
     }
     return containerPipelines;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/245c6fed/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
index b657636..deaad87 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
@@ -93,6 +93,7 @@ public class TestContainerSmallFile {
             traceID);
     String readData = response.getData().getData().toStringUtf8();
     Assert.assertEquals("data123", readData);
+    xceiverClientManager.releaseClient(client);
   }
 
   @Test
@@ -111,6 +112,7 @@ public class TestContainerSmallFile {
     ContainerProtos.GetSmallFileResponseProto response =
         ContainerProtocolCalls.readSmallFile(client, containerName, "key",
             traceID);
+    xceiverClientManager.releaseClient(client);
   }
 
   @Test
@@ -133,6 +135,7 @@ public class TestContainerSmallFile {
     ContainerProtos.GetSmallFileResponseProto response =
         ContainerProtocolCalls.readSmallFile(client, invalidName, "key",
             traceID);
+    xceiverClientManager.releaseClient(client);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/245c6fed/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
new file mode 100644
index 0000000..7300384
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm;
+
+import com.google.common.cache.Cache;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.protocolPB
+    .StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.scm
+    .ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY;
+
+/**
+ * Test for XceiverClientManager caching and eviction.
+ */
+public class TestXceiverClientManager {
+  private static OzoneConfiguration config;
+  private static MiniOzoneCluster cluster;
+  private static StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @BeforeClass
+  public static void init() throws IOException {
+    config = new OzoneConfiguration();
+    cluster = new MiniOzoneCluster.Builder(config)
+        .numDataNodes(1)
+        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    storageContainerLocationClient = cluster
+        .createStorageContainerLocationClient();
+  }
+
+  @Test
+  public void testCaching() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    XceiverClientManager clientManager = new XceiverClientManager(conf);
+
+    String containerName1 = "container" + RandomStringUtils.randomNumeric(10);
+    Pipeline pipeline1 =
+        storageContainerLocationClient.allocateContainer(containerName1);
+    XceiverClientSpi client1 = clientManager.acquireClient(pipeline1);
+    Assert.assertEquals(client1.getRefcount(), 1);
+    Assert.assertEquals(containerName1,
+        client1.getPipeline().getContainerName());
+
+    String containerName2 = "container" + RandomStringUtils.randomNumeric(10);
+    Pipeline pipeline2 =
+        storageContainerLocationClient.allocateContainer(containerName2);
+    XceiverClientSpi client2 = clientManager.acquireClient(pipeline2);
+    Assert.assertEquals(client2.getRefcount(), 1);
+    Assert.assertEquals(containerName2,
+        client2.getPipeline().getContainerName());
+
+    XceiverClientSpi client3 = clientManager.acquireClient(pipeline1);
+    Assert.assertEquals(client3.getRefcount(), 2);
+    Assert.assertEquals(client1.getRefcount(), 2);
+    Assert.assertEquals(containerName1,
+        client3.getPipeline().getContainerName());
+    Assert.assertEquals(client1, client3);
+    clientManager.releaseClient(client1);
+    clientManager.releaseClient(client2);
+    clientManager.releaseClient(client3);
+  }
+
+  @Test
+  public void testFreeByReference() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 2);
+    XceiverClientManager clientManager = new XceiverClientManager(conf);
+    Cache<String, XceiverClientSpi> cache =
+        clientManager.getClientCache();
+
+    String containerName1 = "container" + RandomStringUtils.randomNumeric(10);
+    Pipeline pipeline1 =
+        storageContainerLocationClient.allocateContainer(containerName1);
+    XceiverClientSpi client1 = clientManager.acquireClient(pipeline1);
+    Assert.assertEquals(client1.getRefcount(), 1);
+    Assert.assertEquals(containerName1,
+        client1.getPipeline().getContainerName());
+
+    String containerName2 = "container" + RandomStringUtils.randomNumeric(10);
+    Pipeline pipeline2 =
+        storageContainerLocationClient.allocateContainer(containerName2);
+    XceiverClientSpi client2 = clientManager.acquireClient(pipeline2);
+    Assert.assertEquals(client2.getRefcount(), 1);
+    Assert.assertEquals(containerName2,
+        client2.getPipeline().getContainerName());
+    Assert.assertNotEquals(client1, client2);
+
+    String containerName3 = "container" + RandomStringUtils.randomNumeric(10);
+    Pipeline pipeline3 =
+        storageContainerLocationClient.allocateContainer(containerName3);
+    XceiverClientSpi client3 = clientManager.acquireClient(pipeline3);
+    Assert.assertEquals(client3.getRefcount(), 1);
+    Assert.assertEquals(containerName3,
+        client3.getPipeline().getContainerName());
+
+    // least recent container (i.e containerName1) is evicted
+    XceiverClientSpi nonExistent1 = cache.getIfPresent(containerName1);
+    Assert.assertEquals(nonExistent1, null);
+    // However container call should succeed because of refcount on the client.
+    String traceID1 = "trace" + RandomStringUtils.randomNumeric(4);
+    ContainerProtocolCalls.createContainer(client1,  traceID1);
+
+    // After releasing the client, this connection should be closed
+    // and any container operations should fail
+    clientManager.releaseClient(client1);
+    exception.expect(IOException.class);
+    exception.expectMessage("This channel is not connected.");
+    ContainerProtocolCalls.createContainer(client1,  traceID1);
+    clientManager.releaseClient(client2);
+    clientManager.releaseClient(client3);
+  }
+
+  @Test
+  public void testFreeByEviction() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 2);
+    XceiverClientManager clientManager = new XceiverClientManager(conf);
+    Cache<String, XceiverClientSpi> cache =
+        clientManager.getClientCache();
+
+    String containerName1 = "container" + RandomStringUtils.randomNumeric(10);
+    Pipeline pipeline1 =
+        storageContainerLocationClient.allocateContainer(containerName1);
+    XceiverClientSpi client1 = clientManager.acquireClient(pipeline1);
+    Assert.assertEquals(client1.getRefcount(), 1);
+    Assert.assertEquals(containerName1,
+        client1.getPipeline().getContainerName());
+
+    String containerName2 = "container" + RandomStringUtils.randomNumeric(10);
+    Pipeline pipeline2 =
+        storageContainerLocationClient.allocateContainer(containerName2);
+    XceiverClientSpi client2 = clientManager.acquireClient(pipeline2);
+    Assert.assertEquals(client2.getRefcount(), 1);
+    Assert.assertEquals(containerName2,
+        client2.getPipeline().getContainerName());
+    Assert.assertNotEquals(client1, client2);
+
+    clientManager.releaseClient(client1);
+    Assert.assertEquals(client1.getRefcount(), 0);
+
+    String containerName3 = "container" + RandomStringUtils.randomNumeric(10);
+    Pipeline pipeline3 =
+        storageContainerLocationClient.allocateContainer(containerName3);
+    XceiverClientSpi client3 = clientManager.acquireClient(pipeline3);
+    Assert.assertEquals(client3.getRefcount(), 1);
+    Assert.assertEquals(containerName3,
+        client3.getPipeline().getContainerName());
+
+    // now client 1 should be evicted
+    XceiverClientSpi nonExistent = cache.getIfPresent(containerName1);
+    Assert.assertEquals(nonExistent, null);
+
+    // Any container operation should now fail
+    String traceID2 = "trace" + RandomStringUtils.randomNumeric(4);
+    exception.expect(IOException.class);
+    exception.expectMessage("This channel is not connected.");
+    ContainerProtocolCalls.createContainer(client1, traceID2);
+    clientManager.releaseClient(client2);
+    clientManager.releaseClient(client3);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message