hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From inigo...@apache.org
Subject [30/50] [abbrv] hadoop git commit: HDFS-11538. Move ClientProtocol HA proxies into hadoop-hdfs-client. Contributed by Huafeng Wang.
Date Fri, 07 Apr 2017 01:59:26 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java
deleted file mode 100644
index 2f6c9bc..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode.ha;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.StandbyException;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.io.retry.MultiException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A FailoverProxyProvider implementation that technically does not "failover"
- * per-se. It constructs a wrapper proxy that sends the request to ALL
- * underlying proxies simultaneously. It assumes the in an HA setup, there will
- * be only one Active, and the active should respond faster than any configured
- * standbys. Once it receive a response from any one of the configred proxies,
- * outstanding requests to other proxies are immediately cancelled.
- */
-public class RequestHedgingProxyProvider<T> extends
-        ConfiguredFailoverProxyProvider<T> {
-
-  public static final Logger LOG =
-      LoggerFactory.getLogger(RequestHedgingProxyProvider.class);
-
-  class RequestHedgingInvocationHandler implements InvocationHandler {
-
-    final Map<String, ProxyInfo<T>> targetProxies;
-
-    public RequestHedgingInvocationHandler(
-            Map<String, ProxyInfo<T>> targetProxies) {
-      this.targetProxies = new HashMap<>(targetProxies);
-    }
-
-    /**
-     * Creates a Executor and invokes all proxies concurrently. This
-     * implementation assumes that Clients have configured proper socket
-     * timeouts, else the call can block forever.
-     *
-     * @param proxy
-     * @param method
-     * @param args
-     * @return
-     * @throws Throwable
-     */
-    @Override
-    public Object
-    invoke(Object proxy, final Method method, final Object[] args)
-            throws Throwable {
-      Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
-      int numAttempts = 0;
-
-      ExecutorService executor = null;
-      CompletionService<Object> completionService;
-      try {
-        // Optimization : if only 2 proxies are configured and one had failed
-        // over, then we dont need to create a threadpool etc.
-        targetProxies.remove(toIgnore);
-        if (targetProxies.size() == 1) {
-          ProxyInfo<T> proxyInfo = targetProxies.values().iterator().next();
-          Object retVal = method.invoke(proxyInfo.proxy, args);
-          successfulProxy = proxyInfo;
-          return retVal;
-        }
-        executor = Executors.newFixedThreadPool(proxies.size());
-        completionService = new ExecutorCompletionService<>(executor);
-        for (final Map.Entry<String, ProxyInfo<T>> pEntry :
-                targetProxies.entrySet()) {
-          Callable<Object> c = new Callable<Object>() {
-            @Override
-            public Object call() throws Exception {
-              LOG.trace("Invoking method {} on proxy {}", method,
-                  pEntry.getValue().proxyInfo);
-              return method.invoke(pEntry.getValue().proxy, args);
-            }
-          };
-          proxyMap.put(completionService.submit(c), pEntry.getValue());
-          numAttempts++;
-        }
-
-        Map<String, Exception> badResults = new HashMap<>();
-        while (numAttempts > 0) {
-          Future<Object> callResultFuture = completionService.take();
-          Object retVal;
-          try {
-            retVal = callResultFuture.get();
-            successfulProxy = proxyMap.get(callResultFuture);
-            LOG.debug("Invocation successful on [{}]",
-                successfulProxy.proxyInfo);
-            return retVal;
-          } catch (Exception ex) {
-            ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture);
-            logProxyException(ex, tProxyInfo.proxyInfo);
-            badResults.put(tProxyInfo.proxyInfo, unwrapException(ex));
-            LOG.trace("Unsuccessful invocation on [{}]", tProxyInfo.proxyInfo);
-            numAttempts--;
-          }
-        }
-
-        // At this point we should have All bad results (Exceptions)
-        // Or should have returned with successful result.
-        if (badResults.size() == 1) {
-          throw badResults.values().iterator().next();
-        } else {
-          throw new MultiException(badResults);
-        }
-      } finally {
-        if (executor != null) {
-          LOG.trace("Shutting down threadpool executor");
-          executor.shutdownNow();
-        }
-      }
-    }
-  }
-
-
-  private volatile ProxyInfo<T> successfulProxy = null;
-  private volatile String toIgnore = null;
-
-  public RequestHedgingProxyProvider(
-          Configuration conf, URI uri, Class<T> xface) {
-    this(conf, uri, xface, new DefaultProxyFactory<T>());
-  }
-
-  @VisibleForTesting
-  RequestHedgingProxyProvider(Configuration conf, URI uri,
-                              Class<T> xface, ProxyFactory<T> factory) {
-    super(conf, uri, xface, factory);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public synchronized ProxyInfo<T> getProxy() {
-    if (successfulProxy != null) {
-      return successfulProxy;
-    }
-    Map<String, ProxyInfo<T>> targetProxyInfos = new HashMap<>();
-    StringBuilder combinedInfo = new StringBuilder("[");
-    for (int i = 0; i < proxies.size(); i++) {
-      ProxyInfo<T> pInfo = super.getProxy();
-      incrementProxyIndex();
-      targetProxyInfos.put(pInfo.proxyInfo, pInfo);
-      combinedInfo.append(pInfo.proxyInfo).append(',');
-    }
-    combinedInfo.append(']');
-    T wrappedProxy = (T) Proxy.newProxyInstance(
-            RequestHedgingInvocationHandler.class.getClassLoader(),
-            new Class<?>[]{xface},
-            new RequestHedgingInvocationHandler(targetProxyInfos));
-    return new ProxyInfo<T>(wrappedProxy, combinedInfo.toString());
-  }
-
-  @Override
-  public synchronized void performFailover(T currentProxy) {
-    toIgnore = successfulProxy.proxyInfo;
-    successfulProxy = null;
-  }
-
-  /**
-   * Check the exception returned by the proxy log a warning message if it's
-   * not a StandbyException (expected exception).
-   * @param ex Exception to evaluate.
-   * @param proxyInfo Information of the proxy reporting the exception.
-   */
-  private void logProxyException(Exception ex, String proxyInfo) {
-    if (isStandbyException(ex)) {
-      LOG.debug("Invocation returned standby exception on [{}]", proxyInfo);
-    } else {
-      LOG.warn("Invocation returned exception on [{}]", proxyInfo);
-    }
-  }
-
-  /**
-   * Check if the returned exception is caused by an standby namenode.
-   * @param ex Exception to check.
-   * @return If the exception is caused by an standby namenode.
-   */
-  private boolean isStandbyException(Exception ex) {
-    Exception exception = unwrapException(ex);
-    if (exception instanceof RemoteException) {
-      return ((RemoteException) exception).unwrapRemoteException()
-          instanceof StandbyException;
-    }
-    return false;
-  }
-
-  /**
-   * Unwraps the exception. <p>
-   * Example:
-   * <blockquote><pre>
-   * if ex is
-   * ExecutionException(InvocationTargetExeption(SomeException))
-   * returns SomeException
-   * </pre></blockquote>
-   *
-   * @return unwrapped exception
-   */
-  private Exception unwrapException(Exception ex) {
-    if (ex != null) {
-      Throwable cause = ex.getCause();
-      if (cause instanceof Exception) {
-        Throwable innerCause = cause.getCause();
-        if (innerCause instanceof Exception) {
-          return (Exception) innerCause;
-        }
-        return (Exception) cause;
-      }
-    }
-    return ex;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java
index 7e8621b..6265f44 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java
@@ -42,10 +42,10 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.server.namenode.ha.IPFailoverProxyProvider;
+import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
@@ -333,7 +333,7 @@ public class TestDFSClientFailover {
     private Class<T> xface;
     private T proxy;
     public DummyLegacyFailoverProxyProvider(Configuration conf, URI uri,
-        Class<T> xface) {
+        Class<T> xface, HAProxyFactory<T> proxyFactory) {
       try {
         this.proxy = NameNodeProxies.createNonHAProxy(conf,
             DFSUtilClient.getNNAddress(uri), xface,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
index 7257bbd..14ad6dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
@@ -513,7 +513,7 @@ public class TestDFSUtil {
         NS2_NN2_HOST);
     
     Map<String, Map<String, InetSocketAddress>> map =
-      DFSUtil.getHaNnRpcAddresses(conf);
+        DFSUtilClient.getHaNnRpcAddresses(conf);
 
     assertTrue(HAUtil.isHAEnabled(conf, "ns1"));
     assertTrue(HAUtil.isHAEnabled(conf, "ns2"));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
index 632bbf6..ca44c79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
@@ -292,7 +292,7 @@ public class TestDelegationTokensWithHA {
       nn0.getNameNodeAddress().getPort()));
     nnAddrs.add(new InetSocketAddress("localhost",
       nn1.getNameNodeAddress().getPort()));
-    HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs);
+    HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs);
     
     Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens();
     assertEquals(3, tokens.size());
@@ -321,7 +321,7 @@ public class TestDelegationTokensWithHA {
     }
     
     // reclone the tokens, and see if they match now
-    HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs);
+    HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs);
     for (InetSocketAddress addr : nnAddrs) {
       Text ipcDtService = SecurityUtil.buildTokenService(addr);
       Token<DelegationTokenIdentifier> token2 =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58c01fe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java
deleted file mode 100644
index 37532d5..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode.ha;
-
-import java.io.EOFException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.ProxyFactory;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
-import org.apache.hadoop.io.retry.MultiException;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.StandbyException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.Time;
-import org.apache.log4j.Level;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import com.google.common.collect.Lists;
-
-public class TestRequestHedgingProxyProvider {
-
-  private Configuration conf;
-  private URI nnUri;
-  private String ns;
-
-  @BeforeClass
-  public static void setupClass() throws Exception {
-    GenericTestUtils.setLogLevel(RequestHedgingProxyProvider.LOG, Level.TRACE);
-  }
-
-  @Before
-  public void setup() throws URISyntaxException {
-    ns = "mycluster-" + Time.monotonicNow();
-    nnUri = new URI("hdfs://" + ns);
-    conf = new Configuration();
-    conf.set(DFSConfigKeys.DFS_NAMESERVICES, ns);
-    conf.set(
-        DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, "nn1,nn2");
-    conf.set(
-        DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn1",
-        "machine1.foo.bar:9820");
-    conf.set(
-        DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn2",
-        "machine2.foo.bar:9820");
-  }
-
-  @Test
-  public void testHedgingWhenOneFails() throws Exception {
-    final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
-    Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
-      @Override
-      public long[] answer(InvocationOnMock invocation) throws Throwable {
-        Thread.sleep(1000);
-        return new long[]{1};
-      }
-    });
-    final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
-    Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
-
-    RequestHedgingProxyProvider<NamenodeProtocols> provider =
-        new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
-            createFactory(badMock, goodMock));
-    long[] stats = provider.getProxy().proxy.getStats();
-    Assert.assertTrue(stats.length == 1);
-    Mockito.verify(badMock).getStats();
-    Mockito.verify(goodMock).getStats();
-  }
-
-  @Test
-  public void testHedgingWhenOneIsSlow() throws Exception {
-    final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
-    Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
-      @Override
-      public long[] answer(InvocationOnMock invocation) throws Throwable {
-        Thread.sleep(1000);
-        return new long[]{1};
-      }
-    });
-    final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
-    Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
-
-    RequestHedgingProxyProvider<NamenodeProtocols> provider =
-        new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
-            createFactory(goodMock, badMock));
-    long[] stats = provider.getProxy().proxy.getStats();
-    Assert.assertTrue(stats.length == 1);
-    Assert.assertEquals(1, stats[0]);
-    Mockito.verify(badMock).getStats();
-    Mockito.verify(goodMock).getStats();
-  }
-
-  @Test
-  public void testHedgingWhenBothFail() throws Exception {
-    NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
-    Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
-    NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class);
-    Mockito.when(worseMock.getStats()).thenThrow(
-            new IOException("Worse mock !!"));
-
-    RequestHedgingProxyProvider<NamenodeProtocols> provider =
-        new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
-            createFactory(badMock, worseMock));
-    try {
-      provider.getProxy().proxy.getStats();
-      Assert.fail("Should fail since both namenodes throw IOException !!");
-    } catch (Exception e) {
-      Assert.assertTrue(e instanceof MultiException);
-    }
-    Mockito.verify(badMock).getStats();
-    Mockito.verify(worseMock).getStats();
-  }
-
-  @Test
-  public void testPerformFailover() throws Exception {
-    final AtomicInteger counter = new AtomicInteger(0);
-    final int[] isGood = {1};
-    final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
-    Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
-      @Override
-      public long[] answer(InvocationOnMock invocation) throws Throwable {
-        counter.incrementAndGet();
-        if (isGood[0] == 1) {
-          Thread.sleep(1000);
-          return new long[]{1};
-        }
-        throw new IOException("Was Good mock !!");
-      }
-    });
-    final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
-    Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() {
-      @Override
-      public long[] answer(InvocationOnMock invocation) throws Throwable {
-        counter.incrementAndGet();
-        if (isGood[0] == 2) {
-          Thread.sleep(1000);
-          return new long[]{2};
-        }
-        throw new IOException("Bad mock !!");
-      }
-    });
-    RequestHedgingProxyProvider<NamenodeProtocols> provider =
-            new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
-                    createFactory(goodMock, badMock));
-    long[] stats = provider.getProxy().proxy.getStats();
-    Assert.assertTrue(stats.length == 1);
-    Assert.assertEquals(1, stats[0]);
-    Assert.assertEquals(2, counter.get());
-    Mockito.verify(badMock).getStats();
-    Mockito.verify(goodMock).getStats();
-
-    stats = provider.getProxy().proxy.getStats();
-    Assert.assertTrue(stats.length == 1);
-    Assert.assertEquals(1, stats[0]);
-    // Ensure only the previous successful one is invoked
-    Mockito.verifyNoMoreInteractions(badMock);
-    Assert.assertEquals(3, counter.get());
-
-    // Flip to standby.. so now this should fail
-    isGood[0] = 2;
-    try {
-      provider.getProxy().proxy.getStats();
-      Assert.fail("Should fail since previously successful proxy now fails ");
-    } catch (Exception ex) {
-      Assert.assertTrue(ex instanceof IOException);
-    }
-
-    Assert.assertEquals(4, counter.get());
-
-    provider.performFailover(provider.getProxy().proxy);
-    stats = provider.getProxy().proxy.getStats();
-    Assert.assertTrue(stats.length == 1);
-    Assert.assertEquals(2, stats[0]);
-
-    // Counter should update only once
-    Assert.assertEquals(5, counter.get());
-
-    stats = provider.getProxy().proxy.getStats();
-    Assert.assertTrue(stats.length == 1);
-    Assert.assertEquals(2, stats[0]);
-
-    // Counter updates only once now
-    Assert.assertEquals(6, counter.get());
-
-    // Flip back to old active.. so now this should fail
-    isGood[0] = 1;
-    try {
-      provider.getProxy().proxy.getStats();
-      Assert.fail("Should fail since previously successful proxy now fails ");
-    } catch (Exception ex) {
-      Assert.assertTrue(ex instanceof IOException);
-    }
-
-    Assert.assertEquals(7, counter.get());
-
-    provider.performFailover(provider.getProxy().proxy);
-    stats = provider.getProxy().proxy.getStats();
-    Assert.assertTrue(stats.length == 1);
-    // Ensure correct proxy was called
-    Assert.assertEquals(1, stats[0]);
-  }
-
-  @Test
-  public void testPerformFailoverWith3Proxies() throws Exception {
-    conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
-            "nn1,nn2,nn3");
-    conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns + ".nn3",
-            "machine3.foo.bar:9820");
-
-    final AtomicInteger counter = new AtomicInteger(0);
-    final int[] isGood = {1};
-    final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
-    Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
-      @Override
-      public long[] answer(InvocationOnMock invocation) throws Throwable {
-        counter.incrementAndGet();
-        if (isGood[0] == 1) {
-          Thread.sleep(1000);
-          return new long[]{1};
-        }
-        throw new IOException("Was Good mock !!");
-      }
-    });
-    final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
-    Mockito.when(badMock.getStats()).thenAnswer(new Answer<long[]>() {
-      @Override
-      public long[] answer(InvocationOnMock invocation) throws Throwable {
-        counter.incrementAndGet();
-        if (isGood[0] == 2) {
-          Thread.sleep(1000);
-          return new long[]{2};
-        }
-        throw new IOException("Bad mock !!");
-      }
-    });
-    final NamenodeProtocols worseMock = Mockito.mock(NamenodeProtocols.class);
-    Mockito.when(worseMock.getStats()).thenAnswer(new Answer<long[]>() {
-      @Override
-      public long[] answer(InvocationOnMock invocation) throws Throwable {
-        counter.incrementAndGet();
-        if (isGood[0] == 3) {
-          Thread.sleep(1000);
-          return new long[]{3};
-        }
-        throw new IOException("Worse mock !!");
-      }
-    });
-
-    RequestHedgingProxyProvider<NamenodeProtocols> provider =
-            new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
-                    createFactory(goodMock, badMock, worseMock));
-    long[] stats = provider.getProxy().proxy.getStats();
-    Assert.assertTrue(stats.length == 1);
-    Assert.assertEquals(1, stats[0]);
-    Assert.assertEquals(3, counter.get());
-    Mockito.verify(badMock).getStats();
-    Mockito.verify(goodMock).getStats();
-    Mockito.verify(worseMock).getStats();
-
-    stats = provider.getProxy().proxy.getStats();
-    Assert.assertTrue(stats.length == 1);
-    Assert.assertEquals(1, stats[0]);
-    // Ensure only the previous successful one is invoked
-    Mockito.verifyNoMoreInteractions(badMock);
-    Mockito.verifyNoMoreInteractions(worseMock);
-    Assert.assertEquals(4, counter.get());
-
-    // Flip to standby.. so now this should fail
-    isGood[0] = 2;
-    try {
-      provider.getProxy().proxy.getStats();
-      Assert.fail("Should fail since previously successful proxy now fails ");
-    } catch (Exception ex) {
-      Assert.assertTrue(ex instanceof IOException);
-    }
-
-    Assert.assertEquals(5, counter.get());
-
-    provider.performFailover(provider.getProxy().proxy);
-    stats = provider.getProxy().proxy.getStats();
-    Assert.assertTrue(stats.length == 1);
-    Assert.assertEquals(2, stats[0]);
-
-    // Counter updates twice since both proxies are tried on failure
-    Assert.assertEquals(7, counter.get());
-
-    stats = provider.getProxy().proxy.getStats();
-    Assert.assertTrue(stats.length == 1);
-    Assert.assertEquals(2, stats[0]);
-
-    // Counter updates only once now
-    Assert.assertEquals(8, counter.get());
-
-    // Flip to Other standby.. so now this should fail
-    isGood[0] = 3;
-    try {
-      provider.getProxy().proxy.getStats();
-      Assert.fail("Should fail since previously successful proxy now fails ");
-    } catch (Exception ex) {
-      Assert.assertTrue(ex instanceof IOException);
-    }
-
-    // Counter should ipdate only 1 time
-    Assert.assertEquals(9, counter.get());
-
-    provider.performFailover(provider.getProxy().proxy);
-    stats = provider.getProxy().proxy.getStats();
-    Assert.assertTrue(stats.length == 1);
-
-    // Ensure correct proxy was called
-    Assert.assertEquals(3, stats[0]);
-
-    // Counter updates twice since both proxies are tried on failure
-    Assert.assertEquals(11, counter.get());
-
-    stats = provider.getProxy().proxy.getStats();
-    Assert.assertTrue(stats.length == 1);
-    Assert.assertEquals(3, stats[0]);
-
-    // Counter updates only once now
-    Assert.assertEquals(12, counter.get());
-  }
-
-  @Test
-  public void testHedgingWhenFileNotFoundException() throws Exception {
-    NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class);
-    Mockito
-        .when(active.getBlockLocations(Matchers.anyString(),
-            Matchers.anyLong(), Matchers.anyLong()))
-        .thenThrow(new RemoteException("java.io.FileNotFoundException",
-            "File does not exist!"));
-
-    NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class);
-    Mockito
-        .when(standby.getBlockLocations(Matchers.anyString(),
-            Matchers.anyLong(), Matchers.anyLong()))
-        .thenThrow(
-            new RemoteException("org.apache.hadoop.ipc.StandbyException",
-            "Standby NameNode"));
-
-    RequestHedgingProxyProvider<NamenodeProtocols> provider =
-        new RequestHedgingProxyProvider<>(conf, nnUri,
-            NamenodeProtocols.class, createFactory(active, standby));
-    try {
-      provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L);
-      Assert.fail("Should fail since the active namenode throws"
-          + " FileNotFoundException!");
-    } catch (MultiException me) {
-      for (Exception ex : me.getExceptions().values()) {
-        Exception rEx = ((RemoteException) ex).unwrapRemoteException();
-        if (rEx instanceof StandbyException) {
-          continue;
-        }
-        Assert.assertTrue(rEx instanceof FileNotFoundException);
-      }
-    }
-    Mockito.verify(active).getBlockLocations(Matchers.anyString(),
-        Matchers.anyLong(), Matchers.anyLong());
-    Mockito.verify(standby).getBlockLocations(Matchers.anyString(),
-        Matchers.anyLong(), Matchers.anyLong());
-  }
-
-  @Test
-  public void testHedgingWhenConnectException() throws Exception {
-    NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class);
-    Mockito.when(active.getStats()).thenThrow(new ConnectException());
-
-    NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class);
-    Mockito.when(standby.getStats())
-        .thenThrow(
-            new RemoteException("org.apache.hadoop.ipc.StandbyException",
-            "Standby NameNode"));
-
-    RequestHedgingProxyProvider<NamenodeProtocols> provider =
-        new RequestHedgingProxyProvider<>(conf, nnUri,
-            NamenodeProtocols.class, createFactory(active, standby));
-    try {
-      provider.getProxy().proxy.getStats();
-      Assert.fail("Should fail since the active namenode throws"
-          + " ConnectException!");
-    } catch (MultiException me) {
-      for (Exception ex : me.getExceptions().values()) {
-        if (ex instanceof RemoteException) {
-          Exception rEx = ((RemoteException) ex)
-              .unwrapRemoteException();
-          Assert.assertTrue("Unexpected RemoteException: " + rEx.getMessage(),
-              rEx instanceof StandbyException);
-        } else {
-          Assert.assertTrue(ex instanceof ConnectException);
-        }
-      }
-    }
-    Mockito.verify(active).getStats();
-    Mockito.verify(standby).getStats();
-  }
-
-  @Test
-  public void testHedgingWhenConnectAndEOFException() throws Exception {
-    NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class);
-    Mockito.when(active.getStats()).thenThrow(new EOFException());
-
-    NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class);
-    Mockito.when(standby.getStats()).thenThrow(new ConnectException());
-
-    RequestHedgingProxyProvider<NamenodeProtocols> provider =
-        new RequestHedgingProxyProvider<>(conf, nnUri,
-            NamenodeProtocols.class, createFactory(active, standby));
-    try {
-      provider.getProxy().proxy.getStats();
-      Assert.fail("Should fail since both active and standby namenodes throw"
-          + " Exceptions!");
-    } catch (MultiException me) {
-      for (Exception ex : me.getExceptions().values()) {
-        if (!(ex instanceof ConnectException) &&
-            !(ex instanceof EOFException)) {
-          Assert.fail("Unexpected Exception " + ex.getMessage());
-        }
-      }
-    }
-    Mockito.verify(active).getStats();
-    Mockito.verify(standby).getStats();
-  }
-
-  private ProxyFactory<NamenodeProtocols> createFactory(
-      NamenodeProtocols... protos) {
-    final Iterator<NamenodeProtocols> iterator =
-        Lists.newArrayList(protos).iterator();
-    return new ProxyFactory<NamenodeProtocols>() {
-      @Override
-      public NamenodeProtocols createProxy(Configuration conf,
-          InetSocketAddress nnAddr, Class<NamenodeProtocols> xface,
-          UserGroupInformation ugi, boolean withRetries,
-          AtomicBoolean fallbackToSimpleAuth) throws IOException {
-        return iterator.next();
-      }
-    };
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message