hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [27/47] hadoop git commit: Revert "HADOOP-13168. Support Future.get with timeout in ipc async calls."
Date Tue, 07 Jun 2016 17:35:14 GMT
Revert "HADOOP-13168. Support Future.get with timeout in ipc async calls."

This reverts commit 42c22f7e3d6e88bf1115f617f6e803288886d1ac.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e4450d47
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e4450d47
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e4450d47

Branch: refs/heads/HDFS-1312
Commit: e4450d47f19131818e1c040b6bd8d85ae8250475
Parents: b82c74b
Author: Andrew Wang <wang@apache.org>
Authored: Fri Jun 3 18:09:16 2016 -0700
Committer: Andrew Wang <wang@apache.org>
Committed: Fri Jun 3 18:09:16 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/ipc/Client.java | 119 ++++++++++--------
 .../apache/hadoop/ipc/ProtobufRpcEngine.java    |  62 +++++-----
 .../apache/hadoop/util/concurrent/AsyncGet.java |  60 ---------
 .../hadoop/util/concurrent/AsyncGetFuture.java  |  73 -----------
 .../org/apache/hadoop/ipc/TestAsyncIPC.java     | 124 ++++++++-----------
 .../hadoop/hdfs/AsyncDistributedFileSystem.java |  24 +++-
 .../ClientNamenodeProtocolTranslatorPB.java     |  33 +++--
 7 files changed, 185 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4450d47/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index d1d5b17..9be4649 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -18,10 +18,46 @@
 
 package org.apache.hadoop.ipc;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.CodedOutputStream;
+import static org.apache.hadoop.ipc.RpcConstants.*;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.net.SocketFactory;
+import javax.security.sasl.Sasl;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -57,25 +93,14 @@ import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.AsyncGet;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.apache.htrace.core.Span;
 import org.apache.htrace.core.Tracer;
 
-import javax.net.SocketFactory;
-import javax.security.sasl.Sasl;
-import java.io.*;
-import java.net.*;
-import java.security.PrivilegedExceptionAction;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
-import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.CodedOutputStream;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -94,8 +119,8 @@ public class Client implements AutoCloseable {
 
   private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
   private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
-  private static final ThreadLocal<Future<?>> ASYNC_RPC_RESPONSE
-      = new ThreadLocal<>();
+  private static final ThreadLocal<Future<?>>
+      RETURN_RPC_RESPONSE = new ThreadLocal<>();
   private static final ThreadLocal<Boolean> asynchronousMode =
       new ThreadLocal<Boolean>() {
         @Override
@@ -106,8 +131,8 @@ public class Client implements AutoCloseable {
 
   @SuppressWarnings("unchecked")
   @Unstable
-  public static <T> Future<T> getAsyncRpcResponse() {
-    return (Future<T>) ASYNC_RPC_RESPONSE.get();
+  public static <T> Future<T> getReturnRpcResponse() {
+    return (Future<T>) RETURN_RPC_RESPONSE.get();
   }
 
   /** Set call id and retry count for the next call. */
@@ -354,11 +379,6 @@ public class Client implements AutoCloseable {
       }
     }
 
-    @Override
-    public String toString() {
-      return getClass().getSimpleName() + id;
-    }
-
     /** Indicate when the call is complete and the
      * value or error are available.  Notifies by default.  */
     protected synchronized void callComplete() {
@@ -1393,32 +1413,27 @@ public class Client implements AutoCloseable {
     }
 
     if (isAsynchronousMode()) {
-      final AsyncGet<Writable, IOException> asyncGet
-          = new AsyncGet<Writable, IOException>() {
+      Future<Writable> returnFuture = new AbstractFuture<Writable>() {
+        private final AtomicBoolean callled = new AtomicBoolean(false);
         @Override
-        public Writable get(long timeout, TimeUnit unit)
-            throws IOException, TimeoutException{
-          boolean done = true;
-          try {
-            final Writable w = getRpcResponse(call, connection, timeout, unit);
-            if (w == null) {
-              done = false;
-              throw new TimeoutException(call + " timed out "
-                  + timeout + " " + unit);
-            }
-            return w;
-          } finally {
-            if (done) {
+        public Writable get() throws InterruptedException, ExecutionException {
+          if (callled.compareAndSet(false, true)) {
+            try {
+              set(getRpcResponse(call, connection));
+            } catch (IOException ie) {
+              setException(ie);
+            } finally {
               releaseAsyncCall();
             }
           }
+          return super.get();
         }
       };
 
-      ASYNC_RPC_RESPONSE.set(new AsyncGetFuture<>(asyncGet));
+      RETURN_RPC_RESPONSE.set(returnFuture);
       return null;
     } else {
-      return getRpcResponse(call, connection, -1, null);
+      return getRpcResponse(call, connection);
     }
   }
 
@@ -1454,18 +1469,12 @@ public class Client implements AutoCloseable {
     return asyncCallCounter.get();
   }
 
-  /** @return the rpc response or, in case of timeout, null. */
-  private Writable getRpcResponse(final Call call, final Connection connection,
-      final long timeout, final TimeUnit unit) throws IOException {
+  private Writable getRpcResponse(final Call call, final Connection connection)
+      throws IOException {
     synchronized (call) {
       while (!call.done) {
         try {
-          final long waitTimeout = AsyncGet.Util.asyncGetTimeout2WaitTimeout(
-              timeout, unit);
-          call.wait(waitTimeout); // wait for the result
-          if (waitTimeout > 0 && !call.done) {
-            return null;
-          }
+          call.wait();                           // wait for the result
         } catch (InterruptedException ie) {
           Thread.currentThread().interrupt();
           throw new InterruptedIOException("Call interrupted");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4450d47/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 0f43fc6..8fcdb78 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -18,9 +18,21 @@
 
 package org.apache.hadoop.ipc;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.*;
-import com.google.protobuf.Descriptors.MethodDescriptor;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.SocketFactory;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -40,23 +52,17 @@ import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.AsyncGet;
 import org.apache.htrace.core.TraceScope;
 import org.apache.htrace.core.Tracer;
 
-import javax.net.SocketFactory;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.Message;
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
 
 /**
  * RPC Engine for for protobuf based RPCs.
@@ -64,8 +70,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 @InterfaceStability.Evolving
 public class ProtobufRpcEngine implements RpcEngine {
   public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
-  private static final ThreadLocal<AsyncGet<Message, Exception>>
-      ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
+  private static final ThreadLocal<Callable<?>>
+      RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
 
   static { // Register the rpcRequest deserializer for WritableRpcEngine 
     org.apache.hadoop.ipc.Server.registerProtocolEngine(
@@ -75,9 +81,10 @@ public class ProtobufRpcEngine implements RpcEngine {
 
   private static final ClientCache CLIENTS = new ClientCache();
 
+  @SuppressWarnings("unchecked")
   @Unstable
-  public static AsyncGet<Message, Exception> getAsyncReturnMessage() {
-    return ASYNC_RETURN_MESSAGE.get();
+  public static <T> Callable<T> getReturnMessageCallback() {
+    return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
   }
 
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
@@ -256,17 +263,14 @@ public class ProtobufRpcEngine implements RpcEngine {
       }
       
       if (Client.isAsynchronousMode()) {
-        final Future<RpcResponseWrapper> frrw = Client.getAsyncRpcResponse();
-        final AsyncGet<Message, Exception> asyncGet
-            = new AsyncGet<Message, Exception>() {
+        final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
+        Callable<Message> callback = new Callable<Message>() {
           @Override
-          public Message get(long timeout, TimeUnit unit) throws Exception {
-            final RpcResponseWrapper rrw = timeout < 0?
-                frrw.get(): frrw.get(timeout, unit);
-            return getReturnMessage(method, rrw);
+          public Message call() throws Exception {
+            return getReturnMessage(method, frrw.get());
           }
         };
-        ASYNC_RETURN_MESSAGE.set(asyncGet);
+        RETURN_MESSAGE_CALLBACK.set(callback);
         return null;
       } else {
         return getReturnMessage(method, val);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4450d47/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
deleted file mode 100644
index 5eac869..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGet.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.util.concurrent;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * This interface defines an asynchronous {@link #get(long, TimeUnit)} method.
- *
- * When the return value is still being computed, invoking
- * {@link #get(long, TimeUnit)} will result in a {@link TimeoutException}.
- * The method should be invoked again and again
- * until the underlying computation is completed.
- *
- * @param <R> The type of the return value.
- * @param <E> The exception type that the underlying implementation may throw.
- */
-public interface AsyncGet<R, E extends Throwable> {
-  /**
-   * Get the result.
-   *
-   * @param timeout The maximum time period to wait.
-   *                When timeout == 0, it does not wait at all.
-   *                When timeout < 0, it waits indefinitely.
-   * @param unit The unit of the timeout value
-   * @return the result, which is possibly null.
-   * @throws E an exception thrown by the underlying implementation.
-   * @throws TimeoutException if it cannot return after the given time period.
-   * @throws InterruptedException if the thread is interrupted.
-   */
-  R get(long timeout, TimeUnit unit)
-      throws E, TimeoutException, InterruptedException;
-
-  /** Utility */
-  class Util {
-    /**
-     * @return {@link Object#wait(long)} timeout converted
-     *         from {@link #get(long, TimeUnit)} timeout.
-     */
-    public static long asyncGetTimeout2WaitTimeout(long timeout, TimeUnit unit){
-      return timeout < 0? 0: timeout == 0? 1:unit.toMillis(timeout);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4450d47/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
deleted file mode 100644
index d687867..0000000
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/AsyncGetFuture.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.util.concurrent;
-
-import com.google.common.util.concurrent.AbstractFuture;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/** A {@link Future} implemented using an {@link AsyncGet} object. */
-public class AsyncGetFuture<T, E extends Throwable> extends AbstractFuture<T>
{
-  public static final Log LOG = LogFactory.getLog(AsyncGetFuture.class);
-
-  private final AtomicBoolean called = new AtomicBoolean(false);
-  private final AsyncGet<T, E> asyncGet;
-
-  public AsyncGetFuture(AsyncGet<T, E> asyncGet) {
-    this.asyncGet = asyncGet;
-  }
-
-  private void callAsyncGet(long timeout, TimeUnit unit) {
-    if (!isCancelled() && called.compareAndSet(false, true)) {
-      try {
-        set(asyncGet.get(timeout, unit));
-      } catch (TimeoutException te) {
-        LOG.trace("TRACE", te);
-        called.compareAndSet(true, false);
-      } catch (Throwable e) {
-        LOG.trace("TRACE", e);
-        setException(e);
-      }
-    }
-  }
-
-  @Override
-  public T get() throws InterruptedException, ExecutionException {
-    callAsyncGet(-1, TimeUnit.MILLISECONDS);
-    return super.get();
-  }
-
-  @Override
-  public T get(long timeout, TimeUnit unit)
-      throws InterruptedException, TimeoutException, ExecutionException {
-    callAsyncGet(timeout, unit);
-    return super.get(0, TimeUnit.MILLISECONDS);
-  }
-
-  @Override
-  public boolean isDone() {
-    callAsyncGet(0, TimeUnit.MILLISECONDS);
-    return super.isDone();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4450d47/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 0ad191b..8ee3a2c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -18,6 +18,20 @@
 
 package org.apache.hadoop.ipc;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -34,17 +48,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
 public class TestAsyncIPC {
 
   private static Configuration conf;
@@ -84,50 +87,26 @@ public class TestAsyncIPC {
         try {
           final long param = TestIPC.RANDOM.nextLong();
           TestIPC.call(client, param, server, conf);
-          returnFutures.put(i, Client.getAsyncRpcResponse());
+          Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
+          returnFutures.put(i, returnFuture);
           expectedValues.put(i, param);
         } catch (Exception e) {
+          LOG.fatal("Caught: " + StringUtils.stringifyException(e));
           failed = true;
-          throw new RuntimeException(e);
         }
       }
     }
 
-    void assertReturnValues() throws InterruptedException, ExecutionException {
+    public void waitForReturnValues() throws InterruptedException,
+        ExecutionException {
       for (int i = 0; i < count; i++) {
         LongWritable value = returnFutures.get(i).get();
-        Assert.assertEquals("call" + i + " failed.",
-            expectedValues.get(i).longValue(), value.get());
-      }
-      Assert.assertFalse(failed);
-    }
-
-    void assertReturnValues(long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException {
-      final boolean[] checked = new boolean[count];
-      for(boolean done = false; !done;) {
-        done = true;
-        for (int i = 0; i < count; i++) {
-          if (checked[i]) {
-            continue;
-          } else {
-            done = false;
-          }
-
-          final LongWritable value;
-          try {
-            value = returnFutures.get(i).get(timeout, unit);
-          } catch (TimeoutException e) {
-            LOG.info("call" + i + " caught ", e);
-            continue;
-          }
-
-          Assert.assertEquals("call" + i + " failed.",
-              expectedValues.get(i).longValue(), value.get());
-          checked[i] = true;
+        if (expectedValues.get(i) != value.get()) {
+          LOG.fatal(String.format("Call-%d failed!", i));
+          failed = true;
+          break;
         }
       }
-      Assert.assertFalse(failed);
     }
   }
 
@@ -204,7 +183,8 @@ public class TestAsyncIPC {
 
     private void doCall(final int idx, final long param) throws IOException {
       TestIPC.call(client, param, server, conf);
-      returnFutures.put(idx, Client.getAsyncRpcResponse());
+      Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
+      returnFutures.put(idx, returnFuture);
       expectedValues.put(idx, param);
     }
 
@@ -253,7 +233,10 @@ public class TestAsyncIPC {
     }
     for (int i = 0; i < callerCount; i++) {
       callers[i].join();
-      callers[i].assertReturnValues();
+      callers[i].waitForReturnValues();
+      String msg = String.format("Expected not failed for caller-%d: %s.", i,
+          callers[i]);
+      assertFalse(msg, callers[i].failed);
     }
     for (int i = 0; i < clientCount; i++) {
       clients[i].stop();
@@ -275,37 +258,25 @@ public class TestAsyncIPC {
     try {
       AsyncCaller caller = new AsyncCaller(client, addr, callCount);
       caller.run();
-      caller.assertReturnValues();
-      caller.assertReturnValues();
-      caller.assertReturnValues();
-      Assert.assertEquals(asyncCallCount, client.getAsyncCallCount());
-    } finally {
-      client.stop();
-      server.stop();
-    }
-  }
 
-  @Test(timeout = 60000)
-  public void testFutureGetWithTimeout() throws IOException,
-      InterruptedException, ExecutionException {
-//    GenericTestUtils.setLogLevel(AsyncGetFuture.LOG, Level.ALL);
-    final Server server = new TestIPC.TestServer(10, true, conf);
-    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
-    server.start();
+      caller.waitForReturnValues();
+      String msg = String.format(
+          "First time, expected not failed for caller: %s.", caller);
+      assertFalse(msg, caller.failed);
 
-    final Client client = new Client(LongWritable.class, conf);
+      caller.waitForReturnValues();
+      assertTrue(asyncCallCount == client.getAsyncCallCount());
+      msg = String.format("Second time, expected not failed for caller: %s.",
+          caller);
+      assertFalse(msg, caller.failed);
 
-    try {
-      final AsyncCaller caller = new AsyncCaller(client, addr, 10);
-      caller.run();
-      caller.assertReturnValues(10, TimeUnit.MILLISECONDS);
+      assertTrue(asyncCallCount == client.getAsyncCallCount());
     } finally {
       client.stop();
       server.stop();
     }
   }
 
-
   public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
       int clientCount, int callerCount, int callCount) throws IOException,
       InterruptedException, ExecutionException {
@@ -396,7 +367,9 @@ public class TestAsyncIPC {
       server.start();
       final AsyncCaller caller = new AsyncCaller(client, addr, 4);
       caller.run();
-      caller.assertReturnValues();
+      caller.waitForReturnValues();
+      String msg = String.format("Expected not failed for caller: %s.", caller);
+      assertFalse(msg, caller.failed);
     } finally {
       client.stop();
       server.stop();
@@ -433,7 +406,9 @@ public class TestAsyncIPC {
       server.start();
       final AsyncCaller caller = new AsyncCaller(client, addr, 10);
       caller.run();
-      caller.assertReturnValues();
+      caller.waitForReturnValues();
+      String msg = String.format("Expected not failed for caller: %s.", caller);
+      assertFalse(msg, caller.failed);
     } finally {
       client.stop();
       server.stop();
@@ -468,7 +443,9 @@ public class TestAsyncIPC {
       server.start();
       final AsyncCaller caller = new AsyncCaller(client, addr, 10);
       caller.run();
-      caller.assertReturnValues();
+      caller.waitForReturnValues();
+      String msg = String.format("Expected not failed for caller: %s.", caller);
+      assertFalse(msg, caller.failed);
     } finally {
       client.stop();
       server.stop();
@@ -512,7 +489,10 @@ public class TestAsyncIPC {
       }
       for (int i = 0; i < callerCount; ++i) {
         callers[i].join();
-        callers[i].assertReturnValues();
+        callers[i].waitForReturnValues();
+        String msg = String.format("Expected not failed for caller-%d: %s.", i,
+            callers[i]);
+        assertFalse(msg, callers[i].failed);
       }
     } finally {
       client.stop();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4450d47/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 6bfd71d..4fe0861 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -19,16 +19,20 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
-import org.apache.hadoop.util.concurrent.AsyncGetFuture;
 import org.apache.hadoop.ipc.Client;
 
+import com.google.common.util.concurrent.AbstractFuture;
+
 /****************************************************************
  * Implementation of the asynchronous distributed file system.
  * This instance of this class is the way end-user code interacts
@@ -48,8 +52,22 @@ public class AsyncDistributedFileSystem {
   }
 
   static <T> Future<T> getReturnValue() {
-    return new AsyncGetFuture<>(
-        ClientNamenodeProtocolTranslatorPB.getAsyncReturnValue());
+    final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
+        .getReturnValueCallback();
+    Future<T> returnFuture = new AbstractFuture<T>() {
+      private final AtomicBoolean called = new AtomicBoolean(false);
+      public T get() throws InterruptedException, ExecutionException {
+        if (called.compareAndSet(false, true)) {
+          try {
+            set(returnValueCallback.call());
+          } catch (Exception e) {
+            setException(e);
+          }
+        }
+        return super.get();
+      }
+    };
+    return returnFuture;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4450d47/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 939c1ac..faa925c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -24,8 +24,7 @@ import java.util.EnumSet;
 import java.util.List;
 
 import com.google.common.collect.Lists;
-
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Callable;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -199,7 +198,6 @@ import org.apache.hadoop.security.token.Token;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
-import org.apache.hadoop.util.concurrent.AsyncGet;
 
 /**
  * This class forwards NN's ClientProtocol calls as RPC calls to the NN server
@@ -211,8 +209,8 @@ import org.apache.hadoop.util.concurrent.AsyncGet;
 public class ClientNamenodeProtocolTranslatorPB implements
     ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
   final private ClientNamenodeProtocolPB rpcProxy;
-  private static final ThreadLocal<AsyncGet<?, Exception>>
-      ASYNC_RETURN_VALUE = new ThreadLocal<>();
+  private static final ThreadLocal<Callable<?>>
+      RETURN_VALUE_CALLBACK = new ThreadLocal<>();
 
   static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
       GetServerDefaultsRequestProto.newBuilder().build();
@@ -248,8 +246,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
 
   @SuppressWarnings("unchecked")
   @Unstable
-  public static <T> AsyncGet<T, Exception> getAsyncReturnValue() {
-    return (AsyncGet<T, Exception>) ASYNC_RETURN_VALUE.get();
+  public static <T> Callable<T> getReturnValueCallback() {
+    return (Callable<T>) RETURN_VALUE_CALLBACK.get();
   }
 
   @Override
@@ -371,7 +369,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     try {
       if (Client.isAsynchronousMode()) {
         rpcProxy.setPermission(null, req);
-        setAsyncReturnValue();
+        setReturnValueCallback();
       } else {
         rpcProxy.setPermission(null, req);
       }
@@ -380,18 +378,17 @@ public class ClientNamenodeProtocolTranslatorPB implements
     }
   }
 
-  private void setAsyncReturnValue() {
-    final AsyncGet<Message, Exception> asyncReturnMessage
-        = ProtobufRpcEngine.getAsyncReturnMessage();
-    final AsyncGet<Void, Exception> asyncGet
-        = new AsyncGet<Void, Exception>() {
+  private void setReturnValueCallback() {
+    final Callable<Message> returnMessageCallback = ProtobufRpcEngine
+        .getReturnMessageCallback();
+    Callable<Void> callBack = new Callable<Void>() {
       @Override
-      public Void get(long timeout, TimeUnit unit) throws Exception {
-        asyncReturnMessage.get(timeout, unit);
+      public Void call() throws Exception {
+        returnMessageCallback.call();
         return null;
       }
     };
-    ASYNC_RETURN_VALUE.set(asyncGet);
+    RETURN_VALUE_CALLBACK.set(callBack);
   }
 
   @Override
@@ -406,7 +403,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     try {
       if (Client.isAsynchronousMode()) {
         rpcProxy.setOwner(null, req.build());
-        setAsyncReturnValue();
+        setReturnValueCallback();
       } else {
         rpcProxy.setOwner(null, req.build());
       }
@@ -539,7 +536,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     try {
       if (Client.isAsynchronousMode()) {
         rpcProxy.rename2(null, req);
-        setAsyncReturnValue();
+        setReturnValueCallback();
       } else {
         rpcProxy.rename2(null, req);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message