hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject [08/12] hbase git commit: Procedure v2 - Use nonces for double submits from client (Stephen Yuan Jiang)
Date Thu, 09 Jul 2015 15:33:46 GMT
Procedure v2 - Use nonces for double submits from client (Stephen Yuan Jiang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4eb84651
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4eb84651
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4eb84651

Branch: refs/heads/branch-1
Commit: 4eb84651a2b6d02d2074143308cef5d0f4b856a3
Parents: c66ff88
Author: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Authored: Thu Jul 9 07:44:36 2015 -0700
Committer: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Committed: Thu Jul 9 08:13:32 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/client/HBaseAdmin.java  |   30 +-
 .../hadoop/hbase/protobuf/RequestConverter.java |   69 +-
 .../org/apache/hadoop/hbase/util/NonceKey.java  |   65 +
 .../hadoop/hbase/procedure2/Procedure.java      |   27 +
 .../hbase/procedure2/ProcedureExecutor.java     |   89 +-
 .../hbase/procedure2/ProcedureResult.java       |   34 +-
 .../procedure2/ProcedureTestingUtility.java     |   11 +-
 .../hbase/procedure2/TestProcedureRecovery.java |   39 +-
 .../hbase/protobuf/generated/MasterProtos.java  | 2091 ++++++++++++++++--
 .../protobuf/generated/ProcedureProtos.java     |  257 ++-
 hbase-protocol/src/main/protobuf/Master.proto   |   18 +
 .../src/main/protobuf/Procedure.proto           |    4 +
 .../org/apache/hadoop/hbase/master/HMaster.java |  111 +-
 .../hadoop/hbase/master/MasterRpcServices.java  |   51 +-
 .../hadoop/hbase/master/MasterServices.java     |   70 +-
 .../hbase/regionserver/ServerNonceManager.java  |   33 +-
 .../security/access/AccessControlLists.java     |    4 +-
 .../visibility/VisibilityController.java        |    2 +-
 .../hbase/client/TestHBaseAdminNoCluster.java   |    5 +
 .../master/TestAssignmentManagerOnCluster.java  |    2 +-
 .../hadoop/hbase/master/TestCatalogJanitor.java |   62 +-
 .../MasterProcedureTestingUtility.java          |    9 +
 .../procedure/TestAddColumnFamilyProcedure.java |   94 +-
 .../procedure/TestCreateTableProcedure.java     |   46 +-
 .../TestDeleteColumnFamilyProcedure.java        |  102 +-
 .../procedure/TestDeleteTableProcedure.java     |   48 +-
 .../procedure/TestDisableTableProcedure.java    |   44 +-
 .../procedure/TestEnableTableProcedure.java     |   47 +-
 .../TestModifyColumnFamilyProcedure.java        |   45 +-
 .../procedure/TestModifyTableProcedure.java     |   27 +-
 .../procedure/TestTruncateTableProcedure.java   |   15 +-
 31 files changed, 3039 insertions(+), 512 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4eb84651/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index ab73f50..f446d96 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -206,6 +206,8 @@ public class HBaseAdmin implements Admin {
 
   private RpcRetryingCallerFactory rpcCallerFactory;
 
+  private NonceGenerator ng;
+
   /**
    * Constructor.
    * See {@link #HBaseAdmin(Connection connection)}
@@ -262,6 +264,8 @@ public class HBaseAdmin implements Admin {
       "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min
 
     this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
+
+    this.ng = this.connection.getNonceGenerator();
   }
 
   @Override
@@ -638,7 +642,8 @@ public class HBaseAdmin implements Admin {
         new MasterCallable<CreateTableResponse>(getConnection()) {
       @Override
       public CreateTableResponse call(int callTimeout) throws ServiceException {
-        CreateTableRequest request = RequestConverter.buildCreateTableRequest(desc, splitKeys);
+        CreateTableRequest request = RequestConverter.buildCreateTableRequest(
+          desc, splitKeys, ng.getNonceGroup(), ng.newNonce());
         return master.createTable(null, request);
       }
     });
@@ -808,7 +813,8 @@ public class HBaseAdmin implements Admin {
         new MasterCallable<DeleteTableResponse>(getConnection()) {
       @Override
       public DeleteTableResponse call(int callTimeout) throws ServiceException {
-        DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName);
+        DeleteTableRequest req =
+            RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce());
         return master.deleteTable(null,req);
       }
     });
@@ -920,7 +926,7 @@ public class HBaseAdmin implements Admin {
       @Override
       public Void call(int callTimeout) throws ServiceException {
         TruncateTableRequest req = RequestConverter.buildTruncateTableRequest(
-          tableName, preserveSplits);
+          tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce());
         master.truncateTable(null, req);
         return null;
       }
@@ -1056,7 +1062,8 @@ public class HBaseAdmin implements Admin {
       @Override
       public EnableTableResponse call(int callTimeout) throws ServiceException {
         LOG.info("Started enable of " + tableName);
-        EnableTableRequest req = RequestConverter.buildEnableTableRequest(tableName);
+        EnableTableRequest req =
+            RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce());
         return master.enableTable(null,req);
       }
     });
@@ -1243,7 +1250,8 @@ public class HBaseAdmin implements Admin {
       @Override
       public DisableTableResponse call(int callTimeout) throws ServiceException {
         LOG.info("Started disable of " + tableName);
-        DisableTableRequest req = RequestConverter.buildDisableTableRequest(tableName);
+        DisableTableRequest req =
+            RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce());
         return master.disableTable(null, req);
       }
     });
@@ -1522,7 +1530,8 @@ public class HBaseAdmin implements Admin {
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
       public Void call(int callTimeout) throws ServiceException {
-        AddColumnRequest req = RequestConverter.buildAddColumnRequest(tableName, column);
+        AddColumnRequest req = RequestConverter.buildAddColumnRequest(
+          tableName, column, ng.getNonceGroup(), ng.newNonce());
         master.addColumn(null,req);
         return null;
       }
@@ -1569,7 +1578,8 @@ public class HBaseAdmin implements Admin {
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
       public Void call(int callTimeout) throws ServiceException {
-        DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(tableName, columnName);
+        DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(
+          tableName, columnName, ng.getNonceGroup(), ng.newNonce());
         master.deleteColumn(null,req);
         return null;
       }
@@ -1618,7 +1628,8 @@ public class HBaseAdmin implements Admin {
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
       public Void call(int callTimeout) throws ServiceException {
-        ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(tableName, descriptor);
+        ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(
+          tableName, descriptor, ng.getNonceGroup(), ng.newNonce());
         master.modifyColumn(null,req);
         return null;
       }
@@ -2453,7 +2464,8 @@ public class HBaseAdmin implements Admin {
     executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
       public Void call(int callTimeout) throws ServiceException {
-        ModifyTableRequest request = RequestConverter.buildModifyTableRequest(tableName,
htd);
+        ModifyTableRequest request = RequestConverter.buildModifyTableRequest(
+          tableName, htd, ng.getNonceGroup(), ng.newNonce());
         master.modifyTable(null, request);
         return null;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4eb84651/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index fdc40a5..e9e54ba 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -1061,10 +1061,15 @@ public final class RequestConverter {
    * @return an AddColumnRequest
    */
   public static AddColumnRequest buildAddColumnRequest(
-      final TableName tableName, final HColumnDescriptor column) {
+      final TableName tableName,
+      final HColumnDescriptor column,
+      final long nonceGroup,
+      final long nonce) {
     AddColumnRequest.Builder builder = AddColumnRequest.newBuilder();
     builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
     builder.setColumnFamilies(column.convert());
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1076,10 +1081,15 @@ public final class RequestConverter {
    * @return a DeleteColumnRequest
    */
   public static DeleteColumnRequest buildDeleteColumnRequest(
-      final TableName tableName, final byte [] columnName) {
+      final TableName tableName,
+      final byte [] columnName,
+      final long nonceGroup,
+      final long nonce) {
     DeleteColumnRequest.Builder builder = DeleteColumnRequest.newBuilder();
     builder.setTableName(ProtobufUtil.toProtoTableName((tableName)));
     builder.setColumnName(ByteStringer.wrap(columnName));
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1091,10 +1101,15 @@ public final class RequestConverter {
    * @return an ModifyColumnRequest
    */
   public static ModifyColumnRequest buildModifyColumnRequest(
-      final TableName tableName, final HColumnDescriptor column) {
+      final TableName tableName,
+      final HColumnDescriptor column,
+      final long nonceGroup,
+      final long nonce) {
     ModifyColumnRequest.Builder builder = ModifyColumnRequest.newBuilder();
     builder.setTableName(ProtobufUtil.toProtoTableName((tableName)));
     builder.setColumnFamilies(column.convert());
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1176,9 +1191,14 @@ public final class RequestConverter {
    * @param tableName
    * @return a DeleteTableRequest
    */
-  public static DeleteTableRequest buildDeleteTableRequest(final TableName tableName) {
+  public static DeleteTableRequest buildDeleteTableRequest(
+      final TableName tableName,
+      final long nonceGroup,
+      final long nonce) {
     DeleteTableRequest.Builder builder = DeleteTableRequest.newBuilder();
     builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1189,11 +1209,16 @@ public final class RequestConverter {
    * @param preserveSplits True if the splits should be preserved
    * @return a TruncateTableRequest
    */
-  public static TruncateTableRequest buildTruncateTableRequest(final TableName tableName,
-        boolean preserveSplits) {
+  public static TruncateTableRequest buildTruncateTableRequest(
+      final TableName tableName,
+      final boolean preserveSplits,
+      final long nonceGroup,
+      final long nonce) {
     TruncateTableRequest.Builder builder = TruncateTableRequest.newBuilder();
     builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
     builder.setPreserveSplits(preserveSplits);
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1203,9 +1228,14 @@ public final class RequestConverter {
    * @param tableName
    * @return an EnableTableRequest
    */
-  public static EnableTableRequest buildEnableTableRequest(final TableName tableName) {
+  public static EnableTableRequest buildEnableTableRequest(
+      final TableName tableName,
+      final long nonceGroup,
+      final long nonce) {
     EnableTableRequest.Builder builder = EnableTableRequest.newBuilder();
     builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1215,9 +1245,14 @@ public final class RequestConverter {
    * @param tableName
    * @return a DisableTableRequest
    */
-  public static DisableTableRequest buildDisableTableRequest(final TableName tableName) {
+  public static DisableTableRequest buildDisableTableRequest(
+      final TableName tableName,
+      final long nonceGroup,
+      final long nonce) {
     DisableTableRequest.Builder builder = DisableTableRequest.newBuilder();
     builder.setTableName(ProtobufUtil.toProtoTableName((tableName)));
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1229,7 +1264,10 @@ public final class RequestConverter {
    * @return a CreateTableRequest
    */
   public static CreateTableRequest buildCreateTableRequest(
-      final HTableDescriptor hTableDesc, final byte [][] splitKeys) {
+      final HTableDescriptor hTableDesc,
+      final byte [][] splitKeys,
+      final long nonceGroup,
+      final long nonce) {
     CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
     builder.setTableSchema(hTableDesc.convert());
     if (splitKeys != null) {
@@ -1237,6 +1275,8 @@ public final class RequestConverter {
         builder.addSplitKeys(ByteStringer.wrap(splitKey));
       }
     }
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1249,10 +1289,15 @@ public final class RequestConverter {
    * @return a ModifyTableRequest
    */
   public static ModifyTableRequest buildModifyTableRequest(
-      final TableName tableName, final HTableDescriptor hTableDesc) {
+      final TableName tableName,
+      final HTableDescriptor hTableDesc,
+      final long nonceGroup,
+      final long nonce) {
     ModifyTableRequest.Builder builder = ModifyTableRequest.newBuilder();
     builder.setTableName(ProtobufUtil.toProtoTableName((tableName)));
     builder.setTableSchema(hTableDesc.convert());
+    builder.setNonceGroup(nonceGroup);
+    builder.setNonce(nonce);
     return builder.build();
   }
 
@@ -1354,7 +1399,9 @@ public final class RequestConverter {
    * @param synchronous
    * @return a SetBalancerRunningRequest
    */
-  public static SetBalancerRunningRequest buildSetBalancerRunningRequest(boolean on, boolean
synchronous) {
+  public static SetBalancerRunningRequest buildSetBalancerRunningRequest(
+      boolean on,
+      boolean synchronous) {
     return SetBalancerRunningRequest.newBuilder().setOn(on).setSynchronous(synchronous).build();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4eb84651/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
new file mode 100644
index 0000000..9c7c72a
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+ /**
+   * This implementation is not smart and just treats nonce group and nonce as random bits.
+   */
+  // TODO: we could use pure byte arrays, but then we wouldn't be able to use hash map.
+@InterfaceAudience.Private
+public class NonceKey {
+  private long group;
+  private long nonce;
+
+  public NonceKey(long group, long nonce) {
+    assert nonce != HConstants.NO_NONCE;
+    this.group = group;
+    this.nonce = nonce;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof NonceKey)) {
+      return false;
+    }
+    NonceKey nk = ((NonceKey)obj);
+    return this.nonce == nk.nonce && this.group == nk.group;
+  }
+
+  @Override
+  public int hashCode() {
+    return (int)((group >> 32) ^ group ^ (nonce >> 32) ^ nonce);
+  }
+
+  @Override
+  public String toString() {
+    return "[" + group + ":" + nonce + "]";
+  }
+
+  public long getNonceGroup() {
+    return group;
+  }
+
+  public long getNonce() {
+    return nonce;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4eb84651/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index 601ebcd..a343c89 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.NonceKey;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -78,6 +80,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure>
{
   private RemoteProcedureException exception = null;
   private byte[] result = null;
 
+  private NonceKey nonceKey = null;
+
   /**
    * The main code of the procedure. It must be idempotent since execute()
    * may be called multiple time in case of machine failure in the middle
@@ -262,6 +266,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure>
{
     return parentProcId;
   }
 
+  public NonceKey getNonceKey() {
+    return nonceKey;
+  }
+
   /**
    * @return true if the procedure has failed.
    *         true may mean failed but not yet rolledback or failed and rolledback.
@@ -414,6 +422,15 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure>
{
   }
 
   /**
+   * Called by the ProcedureExecutor to set the value to the newly created procedure.
+   */
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  protected void setNonceKey(final NonceKey nonceKey) {
+    this.nonceKey = nonceKey;
+  }
+
+  /**
    * Internal method called by the ProcedureExecutor that starts the
    * user-level code execute().
    */
@@ -661,6 +678,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure>
{
       builder.setStateData(stateStream.toByteString());
     }
 
+    if (proc.getNonceKey() != null) {
+      builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
+      builder.setNonce(proc.getNonceKey().getNonce());
+    }
+
     return builder.build();
   }
 
@@ -712,6 +734,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure>
{
       proc.setResult(proto.getResult().toByteArray());
     }
 
+    if (proto.getNonce() != HConstants.NO_NONCE) {
+      NonceKey nonceKey = new NonceKey(proto.getNonceGroup(), proto.getNonce());
+      proc.setNonceKey(nonceKey);
+    }
+
     // we want to call deserialize even when the stream is empty, mainly for testing.
     proc.deserializeStateData(proto.getStateData().newInput());
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/4eb84651/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 49b658b..01e9a37 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
 import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.NonceKey;
 import org.apache.hadoop.hbase.util.Pair;
 
 import com.google.common.base.Preconditions;
@@ -134,14 +136,17 @@ public class ProcedureExecutor<TEnvironment> {
     private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
 
     private final Map<Long, ProcedureResult> completed;
+    private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
     private final ProcedureStore store;
     private final Configuration conf;
 
     public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store,
-        final Map<Long, ProcedureResult> completedMap) {
+        final Map<Long, ProcedureResult> completedMap,
+        final Map<NonceKey, Long> nonceKeysToProcIdsMap) {
       // set the timeout interval that triggers the periodic-procedure
       setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL));
       this.completed = completedMap;
+      this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap;
       this.store = store;
       this.conf = conf;
     }
@@ -171,6 +176,11 @@ public class ProcedureExecutor<TEnvironment> {
           }
           store.delete(entry.getKey());
           it.remove();
+
+          NonceKey nonceKey = result.getNonceKey();
+          if (nonceKey != null) {
+            nonceKeysToProcIdsMap.remove(nonceKey);
+          }
         }
       }
     }
@@ -225,6 +235,13 @@ public class ProcedureExecutor<TEnvironment> {
     new ConcurrentHashMap<Long, Procedure>();
 
   /**
+   * Helper map to lookup whether the procedure already issued from the same client.
+   * This map contains every root procedure.
+   */
+  private ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap =
+      new ConcurrentHashMap<NonceKey, Long>();
+
+  /**
    * Timeout Queue that contains Procedures in a WAITING_TIMEOUT state
    * or periodic procedures.
    */
@@ -312,6 +329,11 @@ public class ProcedureExecutor<TEnvironment> {
       proc.beforeReplay(getEnvironment());
       procedures.put(proc.getProcId(), proc);
 
+      // add the nonce to the map
+      if (proc.getNonceKey() != null) {
+        nonceKeysToProcIdsMap.put(proc.getNonceKey(), proc.getProcId());
+      }
+
       if (proc.getState() == ProcedureState.RUNNABLE) {
         runnablesCount++;
       }
@@ -343,6 +365,7 @@ public class ProcedureExecutor<TEnvironment> {
         assert !rollbackStack.containsKey(proc.getProcId());
         procedures.remove(proc.getProcId());
         completed.put(proc.getProcId(), newResultFromProcedure(proc));
+
         continue;
       }
 
@@ -479,7 +502,8 @@ public class ProcedureExecutor<TEnvironment> {
     }
 
     // Add completed cleaner
-    waitingTimeout.add(new CompletedProcedureCleaner(conf, store, completed));
+    waitingTimeout.add(
+      new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap));
   }
 
   public void stop() {
@@ -510,6 +534,7 @@ public class ProcedureExecutor<TEnvironment> {
     completed.clear();
     rollbackStack.clear();
     procedures.clear();
+    nonceKeysToProcIdsMap.clear();
     waitingTimeout.clear();
     runnables.clear();
     lastProcId.set(-1);
@@ -552,13 +577,53 @@ public class ProcedureExecutor<TEnvironment> {
    * @return the procedure id, that can be used to monitor the operation
    */
   public long submitProcedure(final Procedure proc) {
+    return submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
+  }
+
+  /**
+   * Add a new root-procedure to the executor.
+   * @param proc the new procedure to execute.
+   * @param nonceGroup
+   * @param nonce
+   * @return the procedure id, that can be used to monitor the operation
+   */
+  public long submitProcedure(
+      final Procedure proc,
+      final long nonceGroup,
+      final long nonce) {
     Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
     Preconditions.checkArgument(isRunning());
     Preconditions.checkArgument(lastProcId.get() >= 0);
     Preconditions.checkArgument(!proc.hasParent());
 
-    // Initialize the Procedure ID
-    proc.setProcId(nextProcId());
+    Long currentProcId;
+
+    // The following part of the code has to be synchronized to prevent multiple request
+    // with the same nonce to execute at the same time.
+    synchronized (this) {
+      // Check whether the proc exists.  If exist, just return the proc id.
+      // This is to prevent the same proc to submit multiple times (it could happen
+      // when client could not talk to server and resubmit the same request).
+      NonceKey noncekey = null;
+      if (nonce != HConstants.NO_NONCE) {
+        noncekey = new NonceKey(nonceGroup, nonce);
+        currentProcId = nonceKeysToProcIdsMap.get(noncekey);
+        if (currentProcId != null) {
+          // Found the proc
+          return currentProcId;
+        }
+      }
+
+      // Initialize the Procedure ID
+      currentProcId = nextProcId();
+      proc.setProcId(currentProcId);
+
+      // This is new procedure. Set the noncekey and insert into the map.
+      if (noncekey != null) {
+        proc.setNonceKey(noncekey);
+        nonceKeysToProcIdsMap.put(noncekey, currentProcId);
+      }
+    } // end of synchronized (this)
 
     // Commit the transaction
     store.insert(proc, null);
@@ -568,14 +633,14 @@ public class ProcedureExecutor<TEnvironment> {
 
     // Create the rollback stack for the procedure
     RootProcedureState stack = new RootProcedureState();
-    rollbackStack.put(proc.getProcId(), stack);
+    rollbackStack.put(currentProcId, stack);
 
     // Submit the new subprocedures
-    assert !procedures.containsKey(proc.getProcId());
-    procedures.put(proc.getProcId(), proc);
-    sendProcedureAddedNotification(proc.getProcId());
+    assert !procedures.containsKey(currentProcId);
+    procedures.put(currentProcId, proc);
+    sendProcedureAddedNotification(currentProcId);
     runnables.addBack(proc);
-    return proc.getProcId();
+    return currentProcId;
   }
 
   public ProcedureResult getResult(final long procId) {
@@ -1162,8 +1227,10 @@ public class ProcedureExecutor<TEnvironment> {
 
   private static ProcedureResult newResultFromProcedure(final Procedure proc) {
     if (proc.isFailed()) {
-      return new ProcedureResult(proc.getStartTime(), proc.getLastUpdate(), proc.getException());
+      return new ProcedureResult(
+        proc.getNonceKey(), proc.getStartTime(), proc.getLastUpdate(), proc.getException());
     }
-    return new ProcedureResult(proc.getStartTime(), proc.getLastUpdate(), proc.getResult());
+    return new ProcedureResult(
+      proc.getNonceKey(), proc.getStartTime(), proc.getLastUpdate(), proc.getResult());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4eb84651/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
index 98c293b..ff5407f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.procedure2;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.NonceKey;
 
 /**
  * Once a Procedure completes the ProcedureExecutor takes all the useful
@@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class ProcedureResult {
+  private final NonceKey nonceKey;
   private final RemoteProcedureException exception;
   private final long lastUpdate;
   private final long startTime;
@@ -37,21 +39,39 @@ public class ProcedureResult {
 
   private long clientAckTime = -1;
 
-  public ProcedureResult(final long startTime, final long lastUpdate,
+  public ProcedureResult(
+      final NonceKey nonceKey,
+      final long startTime,
+      final long lastUpdate,
       final RemoteProcedureException exception) {
-    this.lastUpdate = lastUpdate;
-    this.startTime = startTime;
-    this.exception = exception;
-    this.result = null;
+    this(nonceKey, exception, lastUpdate, startTime, null);
+  }
+
+  public ProcedureResult(
+      final NonceKey nonceKey,
+      final long startTime,
+      final long lastUpdate,
+      final byte[] result) {
+    this(nonceKey, null, lastUpdate, startTime, result);
   }
 
-  public ProcedureResult(final long startTime, final long lastUpdate, final byte[] result)
{
+  public ProcedureResult(
+      final NonceKey nonceKey,
+      final RemoteProcedureException exception,
+      final long lastUpdate,
+      final long startTime,
+      final byte[] result) {
+    this.nonceKey = nonceKey;
+    this.exception = exception;
     this.lastUpdate = lastUpdate;
     this.startTime = startTime;
-    this.exception = null;
     this.result = result;
   }
 
+  public NonceKey getNonceKey() {
+    return nonceKey;
+  }
+
   public boolean isFailed() {
     return exception != null;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4eb84651/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index a90e056..89a6434 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
@@ -117,7 +118,7 @@ public class ProcedureTestingUtility {
     procStore.start(1);
     procExecutor.start(1, false);
     try {
-      return submitAndWait(procExecutor, proc);
+      return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
     } finally {
       procStore.stop(false);
       procExecutor.stop();
@@ -125,7 +126,13 @@ public class ProcedureTestingUtility {
   }
 
   public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor,
Procedure proc) {
-    long procId = procExecutor.submitProcedure(proc);
+    return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
+  }
+
+  public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor,
Procedure proc,
+      final long nonceGroup,
+      final long nonce) {
+    long procId = procExecutor.submitProcedure(proc, nonceGroup, nonce);
     waitProcedure(procExecutor, procId);
     return procId;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4eb84651/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
index 7735b63..145f7f3 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Assert;
@@ -76,6 +75,9 @@ public class TestProcedureRecovery {
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);
     procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
     procSleepInterval = 0;
+
+    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, false);
+    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, false);
   }
 
   @After
@@ -285,6 +287,41 @@ public class TestProcedureRecovery {
     ProcedureTestingUtility.assertIsAbortException(result);
   }
 
+  @Test(timeout=30000)
+  public void testCompletedProcWithSameNonce() throws Exception {
+    final long nonceGroup = 123;
+    final long nonce = 2222;
+    Procedure proc = new TestSingleStepProcedure();
+    // Submit a proc and wait for its completion
+    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
+
+    // Restart
+    restart();
+    Procedure proc2 = new TestSingleStepProcedure();
+    // Submit a procedure with the same nonce and expect the same procedure would return.
+    long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup,
nonce);
+    assertTrue(procId == procId2);
+
+    ProcedureResult result = procExecutor.getResult(procId2);
+    ProcedureTestingUtility.assertProcNotFailed(result);
+  }
+
+  @Test(timeout=30000)
+  public void testRunningProcWithSameNonce() throws Exception {
+    final long nonceGroup = 456;
+    final long nonce = 33333;
+    Procedure proc = new TestMultiStepProcedure();
+    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
+
+    // Restart
+    restart();
+    Procedure proc2 = new TestMultiStepProcedure();
+    // Submit a procedure with the same nonce and expect the same procedure would return.
+    long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup,
nonce);
+    // The original proc is not completed and the new submission should have the same proc
Id.
+    assertTrue(procId == procId2);
+  }
+
   public static class TestStateMachineProcedure
       extends StateMachineProcedure<Void, TestStateMachineProcedure.State> {
     enum State { STATE_1, STATE_2, STATE_3, DONE }


Mime
View raw message