hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jya...@apache.org
Subject svn commit: r1509018 - in /hbase/branches/0.95: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-client/src/test/java/org/apache/hadoop/hbase/client/ hbase-server/src/main/java/org...
Date Wed, 31 Jul 2013 20:20:46 GMT
Author: jyates
Date: Wed Jul 31 20:20:45 2013
New Revision: 1509018

URL: http://svn.apache.org/r1509018
Log:
HBASE-9049: Generalize ServerCallable creation to support custom callables

Added:
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
Removed:
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCaller.java
Modified:
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
    hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1509018&r1=1509017&r2=1509018&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
(original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
Wed Jul 31 20:20:45 2013
@@ -101,6 +101,7 @@ class AsyncProcess<CResult> {
   protected int numTries;
   protected final boolean useServerTrackerForRetries;
   protected int serverTrackerTimeout;
+  protected RpcRetryingCallerFactory rpcCallerFactory;
 
 
   /**
@@ -167,7 +168,8 @@ class AsyncProcess<CResult> {
   }
 
   public AsyncProcess(HConnection hc, byte[] tableName, ExecutorService pool,
-                      AsyncProcessCallback<CResult> callback, Configuration conf) {
+      AsyncProcessCallback<CResult> callback, Configuration conf, 
+      RpcRetryingCallerFactory rpcCaller) {
     this.hConnection = hc;
     this.tableName = tableName;
     this.pool = pool;
@@ -201,6 +203,8 @@ class AsyncProcess<CResult> {
         serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
       }
     }
+
+    this.rpcCallerFactory = rpcCaller;
   }
 
   /**
@@ -452,7 +456,7 @@ class AsyncProcess<CResult> {
    */
   protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row>
callable) {
     // callable is unused.
-    return new RpcRetryingCaller<MultiResponse>();
+    return rpcCallerFactory.<MultiResponse> newCaller();
   }
 
   /**

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java?rev=1509018&r1=1509017&r2=1509018&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
(original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
Wed Jul 31 20:20:45 2013
@@ -66,7 +66,7 @@ public class ClientScanner extends Abstr
     private final byte[] tableName;
     private final int scannerTimeout;
     private boolean scanMetricsPublished = false;
-    private ScannerCaller caller = new ScannerCaller();
+    private RpcRetryingCaller<Result []> caller;
 
     /**
      * Create a new ClientScanner for the specified table. An HConnection will be
@@ -83,6 +83,7 @@ public class ClientScanner extends Abstr
       this(conf, scan, tableName, HConnectionManager.getConnection(conf));
     }
 
+
     /**
      * Create a new ClientScanner for the specified table
      * Note that the passed {@link Scan}'s start row maybe changed changed.
@@ -93,8 +94,22 @@ public class ClientScanner extends Abstr
      * @param connection Connection identifying the cluster
      * @throws IOException
      */
-    public ClientScanner(final Configuration conf, final Scan scan,
-      final byte[] tableName, HConnection connection) throws IOException {
+  public ClientScanner(final Configuration conf, final Scan scan, final byte[] tableName,
+      HConnection connection) throws IOException {
+    this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf));
+  }
+
+  /**
+   * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s
start
+   * row maybe changed changed.
+   * @param conf The {@link Configuration} to use.
+   * @param scan {@link Scan} to use in this scanner
+   * @param tableName The table that we wish to scan
+   * @param connection Connection identifying the cluster
+   * @throws IOException
+   */
+  public ClientScanner(final Configuration conf, final Scan scan, final byte[] tableName,
+      HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Scan table=" + Bytes.toString(tableName)
             + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
@@ -131,6 +146,8 @@ public class ClientScanner extends Abstr
             HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
       }
 
+    this.caller = rpcFactory.<Result[]> newCaller();
+
       // initialize the scanner
       nextScanner(false);
     }
@@ -180,7 +197,7 @@ public class ClientScanner extends Abstr
       // Close the previous scanner if it's open
       if (this.callable != null) {
         this.callable.setClose();
-        this.caller.callWithRetries(callable, getConnection().getConfiguration());
+        this.caller.callWithRetries(callable);
         this.callable = null;
       }
 
@@ -217,7 +234,7 @@ public class ClientScanner extends Abstr
         callable = getScannerCallable(localStartKey);
         // Open a scanner on the region server starting at the
         // beginning of the region
-        this.caller.callWithRetries(callable, getConnection().getConfiguration());
+        this.caller.callWithRetries(callable);
         this.currentRegion = callable.getHRegionInfo();
         if (this.scanMetrics != null) {
           this.scanMetrics.countOfRegions.incrementAndGet();
@@ -277,10 +294,10 @@ public class ClientScanner extends Abstr
             // Server returns a null values if scanning is to stop.  Else,
             // returns an empty array if scanning is to go on and we've just
             // exhausted current region.
-            values = this.caller.callWithRetries(callable, getConnection().getConfiguration());
+            values = this.caller.callWithRetries(callable);
             if (skipFirst && values != null && values.length == 1) {
               skipFirst = false; // Already skipped, unset it before scanning again
-              values = this.caller.callWithRetries(callable, getConnection().getConfiguration());
+              values = this.caller.callWithRetries(callable);
             }
             retryAfterOutOfOrderException  = true;
           } catch (DoNotRetryIOException e) {
@@ -403,7 +420,7 @@ public class ClientScanner extends Abstr
       if (callable != null) {
         callable.setClose();
         try {
-          this.caller.callWithRetries(callable, getConnection().getConfiguration());
+          this.caller.callWithRetries(callable);
         } catch (IOException e) {
           // We used to catch this error, interpret, and rethrow. However, we
           // have since decided that it's not nice for a scanner's close to

Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java?rev=1509018&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java
(added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java
Wed Jul 31 20:20:45 2013
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+public class DelegatingRetryingCallable<T, D extends RetryingCallable<T>> implements
+    RetryingCallable<T> {
+  protected final D delegate;
+
+  public DelegatingRetryingCallable(D delegate) {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public T call() throws Exception {
+    return delegate.call();
+  }
+
+  @Override
+  public void prepare(boolean reload) throws IOException {
+    delegate.prepare(reload);
+  }
+
+  @Override
+  public void throwable(Throwable t, boolean retrying) {
+    delegate.throwable(t, retrying);
+  }
+
+  @Override
+  public String getExceptionMessageAdditionalDetail() {
+    return delegate.getExceptionMessageAdditionalDetail();
+  }
+
+  @Override
+  public long sleep(long pause, int tries) {
+    return delegate.sleep(pause, tries);
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1509018&r1=1509017&r2=1509018&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
(original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
Wed Jul 31 20:20:45 2013
@@ -154,6 +154,8 @@ public class HBaseAdmin implements Abort
   private boolean aborted;
   private boolean cleanupConnectionOnClose = false; // close the connection in close()
 
+  private RpcRetryingCallerFactory rpcCallerFactory;
+
   /**
    * Constructor.
    * See {@link #HBaseAdmin(HConnection connection)}
@@ -186,6 +188,7 @@ public class HBaseAdmin implements Abort
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
     this.retryLongerMultiplier = this.conf.getInt(
         "hbase.client.retries.longer.multiplier", 10);
+    this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
   }
 
   /**
@@ -2653,10 +2656,9 @@ public class HBaseAdmin implements Abort
    */
   abstract static class MasterAdminCallable<V> extends MasterCallable<V> {
     protected MasterAdminKeepAliveConnection masterAdmin;
-    private final HConnection connection;
 
     public MasterAdminCallable(final HConnection connection) {
-      this.connection = connection;
+      super(connection);
     }
 
     @Override
@@ -2675,10 +2677,9 @@ public class HBaseAdmin implements Abort
    */
   abstract static class MasterMonitorCallable<V> extends MasterCallable<V> {
     protected MasterMonitorKeepAliveConnection masterMonitor;
-    private final HConnection connection;
 
     public MasterMonitorCallable(final HConnection connection) {
-      this.connection = connection;
+      super(connection);
     }
 
     @Override
@@ -2698,6 +2699,12 @@ public class HBaseAdmin implements Abort
    * @param <V>
    */
   abstract static class MasterCallable<V> implements RetryingCallable<V>, Closeable
{
+    protected HConnection connection;
+
+    public MasterCallable(final HConnection connection) {
+      this.connection = connection;
+    }
+
     @Override
     public void throwable(Throwable t, boolean retrying) {
     }
@@ -2714,9 +2721,9 @@ public class HBaseAdmin implements Abort
   }
 
   private <V> V executeCallable(MasterCallable<V> callable) throws IOException
{
-    RpcRetryingCaller<V> caller = new RpcRetryingCaller<V>();
+    RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller();
     try {
-      return caller.callWithRetries(callable, getConfiguration());
+      return caller.callWithRetries(callable);
     } finally {
       callable.close();
     }

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1509018&r1=1509017&r2=1509018&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
(original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
Wed Jul 31 20:20:45 2013
@@ -69,7 +69,6 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest;
@@ -2100,7 +2099,8 @@ public class HConnectionManager {
     // For tests.
     protected <R> AsyncProcess createAsyncProcess(byte[] tableName, ExecutorService
pool,
            AsyncProcess.AsyncProcessCallback<R> callback, Configuration conf) {
-      return new AsyncProcess<R>(this, tableName, pool, callback, conf);
+      return new AsyncProcess<R>(this, tableName, pool, callback, conf,
+          RpcRetryingCallerFactory.instantiate(conf));
     }
 
 

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1509018&r1=1509017&r2=1509018&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
(original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
Wed Jul 31 20:20:45 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.util.Threads;
 
 import java.io.Closeable;
@@ -135,6 +136,7 @@ public class HTable implements HTableInt
 
   /** The Async process for puts with autoflush set to false or multiputs */
   protected AsyncProcess<Object> ap;
+  private RpcRetryingCallerFactory rpcCallerFactory;
 
   /**
    * Creates an object to access a HBase table.
@@ -267,7 +269,9 @@ public class HTable implements HTableInt
         HConstants.HBASE_CLIENT_SCANNER_CACHING,
         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
 
-    ap = new AsyncProcess<Object>(connection, tableName, pool, null, configuration);
+    this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
+    ap = new AsyncProcess<Object>(connection, tableName, pool, null, 
+        configuration, rpcCallerFactory);
 
     this.maxKeyValueSize = this.configuration.getInt(
         "hbase.client.keyvalue.maxsize", -1);
@@ -596,9 +600,8 @@ public class HTable implements HTableInt
            getLocation().getRegionInfo().getRegionName(), row, family);
        }
      };
-     return new RpcRetryingCaller<Result>().
-       callWithRetries(callable, getConfiguration(), this.operationTimeout);
-   }
+    return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
+  }
 
    /**
     * {@inheritDoc}
@@ -643,8 +646,7 @@ public class HTable implements HTableInt
         return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(),
get);
       }
     };
-    return new RpcRetryingCaller<Result>().
-      callWithRetries(callable, getConfiguration(), this.operationTimeout);
+    return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -719,8 +721,7 @@ public class HTable implements HTableInt
         }
       }
     };
-    new RpcRetryingCaller<Boolean>().
-      callWithRetries(callable, getConfiguration(), this.operationTimeout);
+    rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -856,8 +857,7 @@ public class HTable implements HTableInt
         return null;
       }
     };
-    new RpcRetryingCaller<Void>().
-      callWithRetries(callable, getConfiguration(), this.operationTimeout);
+    rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -884,8 +884,7 @@ public class HTable implements HTableInt
           }
         }
       };
-    return new RpcRetryingCaller<Result>().
-      callWithRetries(callable, getConfiguration(), this.operationTimeout);
+    return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -911,8 +910,7 @@ public class HTable implements HTableInt
           }
         }
       };
-    return new RpcRetryingCaller<Result>().
-      callWithRetries(callable, getConfiguration(), this.operationTimeout);
+    return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -962,8 +960,7 @@ public class HTable implements HTableInt
           }
         }
       };
-    return new RpcRetryingCaller<Long>().
-      callWithRetries(callable, getConfiguration(), this.operationTimeout);
+    return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -988,8 +985,7 @@ public class HTable implements HTableInt
           }
         }
       };
-    return new RpcRetryingCaller<Boolean>().
-      callWithRetries(callable, getConfiguration(), this.operationTimeout);
+    return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
   }
 
 
@@ -1015,8 +1011,7 @@ public class HTable implements HTableInt
           }
         }
       };
-    return new RpcRetryingCaller<Boolean>().
-      callWithRetries(callable, getConfiguration(), this.operationTimeout);
+    return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -1037,8 +1032,7 @@ public class HTable implements HTableInt
         }
       }
     };
-    return new RpcRetryingCaller<Boolean>().
-      callWithRetries(callable, getConfiguration(), this.operationTimeout);
+    return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -1144,8 +1138,8 @@ public class HTable implements HTableInt
               }
             }
           };
-          return new RpcRetryingCaller<List<Boolean>>().
-            callWithRetries(callable, getConfiguration(), operationTimeout);
+          return rpcCallerFactory.<List<Boolean>> newCaller().callWithRetries(callable,
+            operationTimeout);
         }
       };
       futures.put(getsByRegionEntry.getKey(), pool.submit(callable));

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java?rev=1509018&r1=1509017&r2=1509018&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
(original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
Wed Jul 31 20:20:45 2013
@@ -62,8 +62,16 @@ public class RpcRetryingCaller<T> {
   private long startTime, endTime;
   private final static int MIN_RPC_TIMEOUT = 2000;
 
-  public RpcRetryingCaller() {
+  private final long pause;
+  private final int retries;
+
+  public RpcRetryingCaller(Configuration conf) {
     super();
+    this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 
+      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+    this.retries =
+        conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+          HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
   }
 
   private void beforeCall() {
@@ -83,32 +91,20 @@ public class RpcRetryingCaller<T> {
     this.endTime = EnvironmentEdgeManager.currentTimeMillis();
   }
 
-  public synchronized T callWithRetries(RetryingCallable<T> callable, final Configuration
conf)
-  throws IOException, RuntimeException {
-    return callWithRetries(callable, conf, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
-  }
-
-  public synchronized T callWithRetries(RetryingCallable<T> callable, final Configuration
conf,
-      final int callTimeout)
-  throws IOException, RuntimeException {
-    final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
-      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
-    final int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-    return callWithRetries(callable, callTimeout, pause, numRetries);
+  public synchronized T callWithRetries(RetryingCallable<T> callable) throws IOException,
+      RuntimeException {
+    return callWithRetries(callable, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
   }
 
   /**
    * Retries if invocation fails.
-   * @param conf
    * @param callTimeout Timeout for this call
    * @param callable The {@link RetryingCallable} to run.
    * @return an object of type T
    * @throws IOException if a remote or network exception occurs
    * @throws RuntimeException other unspecified error
    */
-  synchronized T callWithRetries(RetryingCallable<T> callable, int callTimeout, final
long pause,
-      final int retries)
+  public synchronized T callWithRetries(RetryingCallable<T> callable, int callTimeout)
   throws IOException, RuntimeException {
     this.callTimeout = callTimeout;
     List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =

Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java?rev=1509018&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
(added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
Wed Jul 31 20:20:45 2013
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * Factory to create an {@link RpcRetryingCaller}
+ */
+public class RpcRetryingCallerFactory {
+
+  /** Configuration key for a custom {@link RpcRetryingCaller} */
+  public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class";
+  protected final Configuration conf;
+
+  public RpcRetryingCallerFactory(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public <T> RpcRetryingCaller<T> newCaller() {
+    return new RpcRetryingCaller<T>(conf);
+  }
+
+  public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
+    String rpcCallerFactoryClazz =
+        configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY,
+          RpcRetryingCallerFactory.class.getName());
+    return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
+      new Class[] { Configuration.class }, new Object[] { configuration });
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java?rev=1509018&r1=1509017&r2=1509018&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
(original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
Wed Jul 31 20:20:45 2013
@@ -25,7 +25,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.RegionServerCallable;
-import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
@@ -52,10 +52,13 @@ public class RegionCoprocessorRpcChannel
   private final byte[] row;
   private byte[] lastRegion;
 
+  private RpcRetryingCallerFactory rpcFactory;
+
   public RegionCoprocessorRpcChannel(HConnection conn, byte[] table, byte[] row) {
     this.connection = conn;
     this.table = table;
     this.row = row;
+    this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration());
   }
 
   @Override
@@ -83,8 +86,8 @@ public class RegionCoprocessorRpcChannel
             return ProtobufUtil.execService(getStub(), call, regionName);
           }
         };
-    CoprocessorServiceResponse result = new RpcRetryingCaller<CoprocessorServiceResponse>().
-        callWithRetries(callable, this.connection.getConfiguration());
+    CoprocessorServiceResponse result = rpcFactory.<CoprocessorServiceResponse> newCaller()
+        .callWithRetries(callable);
     Message response = null;
     if (result.getValue().hasValue()) {
       response = responsePrototype.newBuilderForType()

Modified: hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java?rev=1509018&r1=1509017&r2=1509018&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
(original)
+++ hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
Wed Jul 31 20:20:45 2013
@@ -51,7 +51,7 @@ public class TestAsyncProcess {
   private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
   private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
   private static final byte[] FAILS = "FAILS".getBytes();
-  private Configuration conf = new Configuration();
+  private static final Configuration conf = new Configuration();
 
 
   private static ServerName sn = new ServerName("localhost:10,1254");
@@ -67,13 +67,13 @@ public class TestAsyncProcess {
     public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration
conf) {
       super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS,
         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("test-TestAsyncProcess")),
-        callback, conf);
+          callback, conf, new RpcRetryingCallerFactory(conf));
     }
 
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row>
callable) {
       final MultiResponse mr = createMultiResponse(callable.getLocation(), callable.getMulti());
-      return new RpcRetryingCaller<MultiResponse>() {
+      return new RpcRetryingCaller<MultiResponse>(conf) {
         @Override
         public MultiResponse callWithoutRetries( RetryingCallable<MultiResponse> callable)
         throws IOException, RuntimeException {

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1509018&r1=1509017&r2=1509018&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
Wed Jul 31 20:20:45 2013
@@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.client.HC
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
@@ -588,7 +589,9 @@ public class LoadIncrementalHFiles exten
 
     try {
       List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
-      boolean success = new RpcRetryingCaller<Boolean>().callWithRetries(svrCallable,
getConf());
+      Configuration conf = getConf();
+      boolean success = RpcRetryingCallerFactory.instantiate(conf).<Boolean> newCaller()
+          .callWithRetries(svrCallable);
       if (!success) {
         LOG.warn("Attempt to bulk load region containing "
             + Bytes.toStringBinary(first) + " into table "

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java?rev=1509018&r1=1509017&r2=1509018&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
Wed Jul 31 20:20:45 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.HC
 import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
@@ -162,9 +163,10 @@ public class WALEditsReplaySink {
   private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
       final List<Action<Row>> actions) throws IOException {
     try {
+      RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf);
       ReplayServerCallable<MultiResponse> callable = new ReplayServerCallable<MultiResponse>(
           this.conn, this.tableName, regionLoc, regionInfo, actions);
-      new RpcRetryingCaller<MultiResponse>().callWithRetries(callable, conf, this.replayTimeout);
+      factory.<MultiResponse> newCaller().callWithRetries(callable, this.replayTimeout);
     } catch (IOException ie) {
       if (skipErrors) {
         LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java?rev=1509018&r1=1509017&r2=1509018&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
(original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
Wed Jul 31 20:20:45 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.io.compress.Compression;
@@ -157,8 +158,9 @@ public class TestHRegionServerBulkLoad {
           return null;
         }
       };
-      RpcRetryingCaller<Void> caller = new RpcRetryingCaller<Void>();
-      caller.callWithRetries(callable, UTIL.getConfiguration());
+      RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
+      RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
+      caller.callWithRetries(callable);
 
       // Periodically do compaction to reduce the number of open file handles.
       if (numBulkLoads.get() % 10 == 0) {
@@ -178,7 +180,7 @@ public class TestHRegionServerBulkLoad {
             return null;
           }
         };
-        caller.callWithRetries(callable, UTIL.getConfiguration());
+        caller.callWithRetries(callable);
       }
     }
   }



Mime
View raw message